Skip to content

Ircama/pysnmp-sync-adapter

Repository files navigation

pysnmp-sync-adapter

PyPI PyPI download month

Lightweight Synchronous Adapter for PySNMP AsyncIO HLAPI


This package provides lightweight, blocking wrappers around the pysnmp.hlapi.v1arch.asyncio and pysnmp.hlapi.v3arch.asyncio modules of PySNMP, enabling synchronous use of the SNMP high-level API in PySNMP v7+ without requiring direct asyncio management. It preserves the flexibility of PySNMP’s asyncio-based architecture and includes a compatibility layer for various legacy interfaces. The two additional functions parallel_get_sync and cluster_varbinds enable efficient, high-performance concurrent SNMP queries in a blocking context.

Features

  • Drop-in synchronous alternatives to PySNMP's async-HLAPI: get_cmd_sync, next_cmd_sync, set_cmd_sync, bulk_cmd_sync, walk_cmd_sync, bulk_walk_cmd_sync.
  • Supports both v1arch and v3arch PySNMP v7+ architectures, automatically selected or configurable via the PYSNMP_ARCH environment variable.
  • Supports both IPv4 and IPv6 transport targets via UdpTransportTarget and Udp6TransportTarget.
  • Reuses or creates the default shared event loop (asyncio.get_event_loop()), ensuring integration efficiency.
  • Sync wrappers accept an optional timeout parameter (in seconds) that limits the total execution time using asyncio.wait_for().
  • Minimizes connection overhead by reusing pre-created transport instances when calling create_transport().
  • The add-on parallel_get_sync function executes multiple SNMP GET requests concurrently in a blocking context. It is complemented by cluster_varbinds, an ancillary utility that normalizes and clusters OIDs into ordered chunks for efficient batching. Together, they enable high-throughput querying of large OID sets without requiring asyncio, reducing per-PDU overhead while preserving request order.
  • In addition, through the pysnmp_sync_adapter.legacy_wrappers compatibility layer, it supports:
  • Also, through the pysnmp_sync_adapter.cmdgen_wrappers compatibility layer, it runs code written against the cmdgen SNMP library appearing in pysnmp.entity.rfc3413.oneliner.

These adapters allow to call the familiar HLAPI functions in a purely synchronous style (e.g. in scripts, GUIs like Tkinter, or blocking contexts) without having to manage asyncio directly. This restores the synchronous experience familiar from earlier PySNMP versions. Native sync HLAPI wrappers were deprecated in recent releases in favor of asyncio.

Provided Methods

Synchronous Function AsyncIO Equivalent
get_cmd_sync get_cmd
next_cmd_sync next_cmd
set_cmd_sync set_cmd
bulk_cmd_sync bulk_cmd
walk_cmd_sync walk_cmd (async-gen)
bulk_walk_cmd_sync bulk_walk_cmd (async-gen)
create_transport Synchronously awaits the async create() factory on UdpTransportTarget and Udp6TransportTarget
Add-on Function Description
parallel_get_sync Parallel get_cmd
cluster_varbinds Normalize and cluster OID lists

Quick Start

Using v1arch:

from pysnmp.hlapi.v1arch.asyncio import *
from pysnmp_sync_adapter import get_cmd_sync, create_transport

err, status, index, var_binds = get_cmd_sync(
    SnmpDispatcher(),
    CommunityData('public', mpModel=0),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
)

for name, val in var_binds:
    print(f'{name} = {val}')

Using v3arch:

from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter import get_cmd_sync, create_transport

err, status, index, var_binds = get_cmd_sync(
    SnmpEngine(),
    CommunityData('public', mpModel=0),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
)

for name, val in var_binds:
    print(f'{name} = {val}')

parallel_get_sync

Executes a batch of SNMP GET requests concurrently.

parallel_get_sync is an add-on function introduced by pysnmp-sync-adapter which efficiently dispatches multiple SNMP GET operations in parallel while preserving optimal PDU packing and result ordering, allowing timeout and throttling.

def parallel_get_sync(
    snmp_engine,
    auth_data,
    transport_target,
    *pdu_args,
    queries: list,
    timeout: float = None,
    max_parallel: int = None,
    **pdu_kwargs
) -> list[tuple]:

Args:

  snmp_engine      – SnmpDispatcher() or SnmpEngine() instance
  auth_data        – CommunityData or UsmUserData
  transport_target – transport returned by create_transport()
  *pdu_args        – positional args before var-binds (e.g. ContextData for v3arch)
  queries          – list of either:
                      • ObjectType(...)  — each sent in its own PDU  
                      • [ObjectType(...), …] — grouped into a single PDU  
  timeout          – optional float, total seconds to wait for all requests
  max_parallel=10, - optional throttle
  **pdu_kwargs     – extra get_cmd() keyword args (e.g. lookupMib=False)

Returns a list of (errorIndication, errorStatus, errorIndex, varBinds) tuples in the same order as the queries list.

Behavior:

  1. Single ObjectType entries each become one PDU and run in parallel.
  2. Sub-lists of ObjectType are packed into one multi-OID PDU each.
  3. All PDUs fire concurrently with timeout and throttle via asyncio.gather().
  4. Results are returned in order, matching queries.
  5. Optional timeout wraps the entire gather with asyncio.wait_for().
  6. Optional max_parallel throttles how many SNMP PDUs are fired in parallel, so the agent is not overwhelmed.

Example (v1arch):

from pysnmp.hlapi.v1arch.asyncio import (
    SnmpDispatcher, CommunityData, UdpTransportTarget,
    ObjectType, ObjectIdentity
)
from pysnmp_sync_adapter import create_transport, parallel_get_sync

engine    = SnmpDispatcher()
community = CommunityData("public", mpModel=0)
transport = create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)

queries = [
  ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")),             # single-OID
  [
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.3.0")),
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.5.0"))
  ],                                                           # grouped OIDs
  ObjectType(ObjectIdentity("1.3.6.1.2.1.2.2.1.2.1"))          # another single-OID
]

results = parallel_get_sync(
  engine,
  community,
  transport,
  queries=queries,  # keyword-only argument
  timeout=5,
  lookupMib=False
)

for errInd, errStat, errIdx, varBinds in results:
    if errInd or errStat:
        print("Error:", errInd or errStat)
    else:
        for oid, val in varBinds:
            print(f"{oid} = {val}")

Example (v3arch):

from pysnmp.hlapi.v3arch.asyncio import (
    SnmpDispatcher, UsmUserData, ContextData,
    UdpTransportTarget, ObjectType, ObjectIdentity
)
from pysnmp_sync_adapter import create_transport, parallel_get_sync

engine    = SnmpDispatcher()
auth      = UsmUserData("usr", authKey=b"abc", privKey=b"xyz", mpModel=3)
context   = ContextData()
transport = create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)

results = parallel_get_sync(
  engine,
  auth,
  transport,
  context,
  queries=[
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")),
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.3.0"))
  ],
  timeout=5
)

cluster_varbinds

cluster_varbinds is an ancillary function of that normalizes and clusters queries into flat chunks of up to max_per_pdu var-binds per PDU.

It converts a list of OIDs into ordered sublists, each representing one SNMP PDU with no more than max_per_pdu var-binds. This enables efficient batching and parallel dispatch via parallel_get_sync, improving throughput by reducing per-PDU overhead while preserving request order.

Depending on the use case, the performance improvement can be significant.

Example of usage:

raw_queries = [
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
    [
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.2.0')),
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.3.0'))
    ],
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.4.0'))
]

wrapped_queries = [
    [ ObjectType(ObjectIdentity(x)) for x in group ]
    for group in raw_queries
]

wrapped_queries = cluster_varbinds(wrapped_queries, max_per_pdu=10)  # this can get relevant performance improvement

raw_results = parallel_get_sync(
    engine,
    auth,
    transport,
    queries=wrapped_queries,
    max_parallel=5
)

Description:

def cluster_varbinds(
    queries: Sequence[Union[ObjectType, Sequence[ObjectType]]],
    max_per_pdu: int
) -> List[List[ObjectType]]:

Parameters:

  • queries (Sequence[ObjectType] or Sequence[Sequence[ObjectType]]]): A mixed list where each element is either a single ObjectType or a list/tuple of them.
  • max_per_pdu (int): Maximum number of var-binds to include in each PDU. Must be >= 1.

Returns:

  • List[List[ObjectType]]: A list of flat sub-lists, each containing up to max_per_pdu ObjectType instances, preserving the original order.

Usage:

from pysnmp.hlapi import ObjectType, ObjectIdentity

# Prepare a mixed sequence of queries
raw_queries = [
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
    [
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.2.0')),
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.3.0'))
    ],
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.4.0'))
]

# Chunk into PDUs of max 2 var-binds each
pdus = cluster_varbinds(raw_queries, max_per_pdu=2)
# pdus == [
#   [OT('1.3.6.1.2.1.1.1.0'), OT('1.3.6.1.2.1.1.2.0')],
#   [OT('1.3.6.1.2.1.1.3.0'), OT('1.3.6.1.2.1.1.4.0')]
#]

create_transport

create_transport synchronously awaits the async create() factory on the given transport class UdpTransportTarget or Udp6TransportTarget and only passes timeout and retries if they are not None, returning a ready-to-use transport object.

def create_transport(
    transport_cls, *addr, timeout=None, retries=None, **other_kwargs
)

Example for IPv4:

create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)

Example for IPv6:

create_transport(Udp6TransportTarget, ("2001:db8::1", 161), timeout=2)

Internal Utilities

  • ensure_loop() — Retrieves the current default event loop via asyncio.get_event_loop(), or creates and sets one if none exists. Ensures one loop is available per thread.
  • _sync_coro() — Executes a coroutine to completion on the shared event loop, with optional timeout support via asyncio.wait_for(). Handles already-running loops by scheduling a future.
  • _sync_agen() — Collects all items from an async generator (e.g., walk_cmd) into a list by internally awaiting it with _sync_coro().
  • make_sync() — Higher-order function that wraps PySNMP async-HLAPI coroutines into synchronous functions, propagating optional timeout arguments.

By avoiding per-call event loop instantiation and by reusing transport targets, this implementation significantly reduces runtime overhead in tight polling or query loops.


Installation

pip install pysnmp-sync-adapter

This will automatically install the latest version of pysnmp as a dependency, if it is not already present.

Usage

To ensure compatibility with the selected PySNMP architecture (v1arch or v3arch), make sure to import pysnmp.hlapi.v3arch.asyncio (or v1arch) before importing from pysnmp_sync_adapter. For example:

from pysnmp.hlapi.v3arch.asyncio import *  # Must come first (or v1arch)

from pysnmp_sync_adapter import (
    get_cmd_sync,
    next_cmd_sync,
    set_cmd_sync,
    bulk_cmd_sync,
    walk_cmd_sync,
    bulk_walk_cmd_sync,
    create_transport
)

This ensures that the adapter binds to the appropriate internal PySNMP modules. If omitted or imported in the wrong order, pysnmp_sync_adapter may fallback to v1arch even when v3arch is desired.

Alternatively, the environment variable PYSNMP_ARCH can be set to "v3arch" (or "v1arch"). Example:

import os
os.environ["PYSNMP_ARCH"] = "v3arch"  # or "v1arch"

from pysnmp_sync_adapter import get_cmd_sync  # etc.

This method is particularly useful in larger applications or testing scenarios where import order might be harder to control.

Note: When both "v1arch" and "v3arch" modules need to be used sequentially in the same program (unusual technique), it has been verified that purging the relevant modules before re-importing them should offer correct behavior:

for mod in list(sys.modules):
    if mod.startswith("pysnmp.hlapi.") or mod.startswith("pysnmp_sync_adapter"):
        del sys.modules[mod]

High-level v1arch sync

import asyncio
import platform
from pysnmp.hlapi.v1arch.asyncio import *
from pysnmp_sync_adapter import (
    get_cmd_sync,
    next_cmd_sync,
    set_cmd_sync,
    bulk_cmd_sync,
    walk_cmd_sync,
    bulk_walk_cmd_sync,
    create_transport
)


if platform.system() == "Windows":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

community = "public"
dispatcher = SnmpDispatcher()
auth_data = CommunityData(community, mpModel=0)

print("\n--> get_cmd_sync")
error_indication, error_status, error_index, var_binds = get_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> set_cmd_sync")
error_indication, error_status, error_index, var_binds = set_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0), "Linux i386"),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> next_cmd_sync")
error_indication, error_status, error_index, var_binds = next_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_cmd_sync")
error_indication, error_status, error_index, var_binds = bulk_cmd_sync(
    dispatcher,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    0,
    2,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> walk_cmd_sync")
objects = walk_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_walk_cmd_sync")
objects = bulk_walk_cmd_sync(
    dispatcher,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    0,
    25,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())

High-level v3arch sync

import asyncio
import platform
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter import (
    get_cmd_sync,
    next_cmd_sync,
    set_cmd_sync,
    bulk_cmd_sync,
    walk_cmd_sync,
    bulk_walk_cmd_sync,
    create_transport
)

if platform.system() == "Windows":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

community = "public"
engine = SnmpEngine()

print("\n--> get_cmd_sync")
error_indication, error_status, error_index, var_binds = get_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> set_cmd_sync")
error_indication, error_status, error_index, var_binds = set_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0), "Linux i386"),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> next_cmd_sync")
error_indication, error_status, error_index, var_binds = next_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_cmd_sync")
error_indication, error_status, error_index, var_binds = bulk_cmd_sync(
    engine,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    0,
    2,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> walk_cmd_sync")
objects = walk_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_walk_cmd_sync")
objects = bulk_walk_cmd_sync(
    engine,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    0,
    25,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())

Supporting other libraries

This adapter provides compatibility for code written against other libraries using synchronous SNMP commands. It allows legacy SNMPv1/v2c/v3 code to run unchanged for backward compatibility, while taking advantage of simplified synchronous operation.

Implements wrappers:

Function Description
getCmd(...) Yields a single (errInd, errStat, errIdx, varBinds) from get_cmd_sync
setCmd(...) Same as getCmd but for set_cmd_sync
nextCmd(...) Uses next_cmd_sync
bulkCmd(...) Uses bulk_cmd_sync
walkCmd(...) Uses walk_cmd_sync
bulkWalkCmd(...) Uses bulk_walk_cmd_sync
Udp6TransportTarget(...) Legacy-compatible wrapper
UdpTransportTarget(...) Legacy-compatible wrapper

These wrappers preserve the iterator-based usage of pysnmp.hlapi but operate using blocking, synchronous calls underneath.

Support of the Python SNMP library v5.0.24 HLAPI

Compatibility layer for code written against the Python SNMP library v5 HLAPI (https://github.com/pysnmp/pysnmp) using synchronous SNMP commands.

This library uses SnmpEngine() and ContextData(). It requires legacy_wrappers with v3arch.

Example Usage

from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter.legacy_wrappers import UdpTransportTarget, getCmd

for errorIndication, errorStatus, errorIndex, varBinds in getCmd(
    SnmpEngine(),
    CommunityData('public', mpModel=0),
    UdpTransportTarget(("demo.pysnmp.com", 161)),
    ContextData(),
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
):
    if errorIndication:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    elif errorStatus:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    else:
        for name, val in varBinds:
            print(name, "=", val)

Support of legacy etingof/pysnmp v5 HLAPI

Compatibility layer for code written against the legacy etingof/pysnmp v5 HLAPI (https://github.com/etingof/pysnmp) using synchronous SNMP commands.

This library uses SnmpDispatcher() and does not use ContextData(). It requires legacy_wrappers with v1arch.

Example Usage

from pysnmp.hlapi.v1arch.asyncio import *
from pyasn1.type.univ import OctetString as OctetStringType
from pysnmp_sync_adapter.legacy_wrappers import UdpTransportTarget, getCmd

timeout = 2
retries = 2
iterator = getCmd(
    SnmpDispatcher(),
    CommunityData('public', mpModel=0),
    UdpTransportTarget(
        ("demo.pysnmp.com", 161),
        timeout,  # optional parameter
        retries  # optional parameter
    ),
    ('1.3.6.1.2.1.1.1.0', None)
)

for response in iterator:
    errorIndication, errorStatus, errorIndex, varBinds = response
    if errorIndication:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    elif errorStatus:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    else:
        for varBind in varBinds:
            print(' = '.join([x.prettyPrint() for x in varBind]))

Notes

UdpTransportTarget

The adapter supports two legacy initialization forms:

UdpTransportTarget(("host", port), timeout, retries)
UdpTransportTarget(("host", port, timeout, retries))

Both forms correctly map to the underlying transport constructor, omitting timeout and retries if None.

errorIndication

If the error indication errorIndication is present, the returned message is a string.

Besides, if the message includes "before timeout", it will be augmented with " - timed out" for compatibility matching.

Support of the cmdgen SNMP library apearing in pysnmp oneliner

Compatibility layer for code written against the cmdgen SNMP library appearing in pysnmp.entity.rfc3413.oneliner, using synchronous SNMP commands.

It needs v3arch, transparently inserts the required SnmpEngine() and ContextData() parameters for SNMPv3 (v3arch.asyncio) calls, and wraps OID tuples in ObjectType(ObjectIdentity(...)). Legacy UDP and UDP6 transports, including timeout and retry arguments, are preserved.

Example Usage

from pysnmp.hlapi.v3arch.asyncio import *
import pysnmp_sync_adapter.cmdgen_wrappers as cmdgen
cmd_gen = cmdgen.CommandGenerator()
transport = cmdgen.UdpTransportTarget(("demo.pysnmp.com", 161), timeout=5, retries=1)
oid = '1.3.6.1.2.1.1.1.0'
oid_tuple = tuple(int(part) for part in oid.split('.'))
comm_data = cmdgen.CommunityData('public', mpModel=0)
error_indication, error_status, error_index, var_binds = cmd_gen.getCmd(
    comm_data,
    transport,
    oid_tuple
)
for name, val in var_binds:
    print(f'{name} = {val}')

Other example:

from pysnmp.hlapi.v3arch.asyncio import *
import pysnmp_sync_adapter.cmdgen_wrappers as cmdgen
_oids = ('1.3.6.1.2.1.1.1.0', '1.3.6.1.2.1.1.4.0',
         '1.3.6.1.2.1.1.5.0', '1.3.6.1.2.1.1.6.0')
user = 'myUser'
authKe = 'authPassword'
privKe = 'privPassword'
authProto = usmHMACSHAAuthProtocol
privProto = usmAesCfb128Protocol
cmdGen = cmdgen.CommandGenerator()
cmdGen.getCmd(
    cmdgen.UsmUserData(
        user, authKey=authKe, privKey=privKe,
        authProtocol=authProto, privProtocol=privProto
    ),
    cmdgen.UdpTransportTarget(("demo.pysnmp.com", 161)),
    *[_ for _ in (ObjectType(ObjectIdentity(oid)) for oid in _oids)]
)

Notes and limitations

  • These adapters block the calling thread until the SNMP operation completes.
  • They rely on the default asyncio event loop obtained via asyncio.get_event_loop(). If no loop is set, one is created and registered. They do not create isolated loops.
  • Since PySNMP uses the default event loop bound to the current thread, invoking these synchronous wrappers from a thread that is already running an event loop may cause deadlocks or RuntimeError. To use them safely in such environments, run them from a separate thread. Hybrid usage of both native async code and sync queries via the _sync API in the same application is strongly discouraged unless carefully managed, because triggering sync queries (_sync) while the asyncio event loop is running may cause undefined behavior or deadlocks.
  • A timeout (in seconds) can be optionally passed to all sync wrappers; it limits the total wall-clock time of the SNMP operation using asyncio.wait_for(). On timeout, asyncio.TimeoutError is raised.
  • The underlying transport layer’s timeouts (e.g. UdpTransportTarget(..., timeout=2)) still apply, and should be set appropriately to avoid low-level blocking.
  • These wrappers do not forcibly cancel low-level transport operations. A timeout interrupts the coroutine, but not the transport at the OS level.

This repository uses the public SNMP simulation service at demo.pysnmp.com, provided courtesy of Lextudio. Please ensure network access to demo.pysnmp.com:161 is available when running the tests (python -m pytest).

Reference documentation

License

EUPL-1.2 License - See LICENCE for details.

About

Synchronous Access to PySNMP Async HLAPI (High-level API)

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages