Lightweight Synchronous Adapter for PySNMP AsyncIO HLAPI
This package provides lightweight, blocking wrappers around the pysnmp.hlapi.v1arch.asyncio
and pysnmp.hlapi.v3arch.asyncio
modules of PySNMP, enabling synchronous use of the SNMP high-level API in PySNMP v7+ without requiring direct asyncio
management. It preserves the flexibility of PySNMP’s asyncio-based architecture and includes a compatibility layer for various legacy interfaces. The two additional functions parallel_get_sync
and cluster_varbinds
enable efficient, high-performance concurrent SNMP queries in a blocking context.
- Drop-in synchronous alternatives to PySNMP's async-HLAPI:
get_cmd_sync
,next_cmd_sync
,set_cmd_sync
,bulk_cmd_sync
,walk_cmd_sync
,bulk_walk_cmd_sync
. - Supports both v1arch and v3arch PySNMP v7+ architectures, automatically selected or configurable via the
PYSNMP_ARCH
environment variable. - Supports both IPv4 and IPv6 transport targets via
UdpTransportTarget
andUdp6TransportTarget
. - Reuses or creates the default shared event loop (
asyncio.get_event_loop()
), ensuring integration efficiency. - Sync wrappers accept an optional
timeout
parameter (in seconds) that limits the total execution time usingasyncio.wait_for()
. - Minimizes connection overhead by reusing pre-created transport instances when calling
create_transport()
. - The add-on
parallel_get_sync
function executes multiple SNMP GET requests concurrently in a blocking context. It is complemented bycluster_varbinds
, an ancillary utility that normalizes and clusters OIDs into ordered chunks for efficient batching. Together, they enable high-throughput querying of large OID sets without requiring asyncio, reducing per-PDU overhead while preserving request order. - In addition, through the
pysnmp_sync_adapter.legacy_wrappers
compatibility layer, it supports:- the Python SNMP library v5.0.24 HLAPI
- the
etingof/pysnmp
v5 HLAPI
- Also, through
the pysnmp_sync_adapter.cmdgen_wrappers
compatibility layer, it runs code written against thecmdgen
SNMP library appearing inpysnmp.entity.rfc3413.oneliner
.
These adapters allow to call the familiar HLAPI functions in a purely synchronous style (e.g. in scripts, GUIs like Tkinter, or blocking contexts) without having to manage asyncio
directly. This restores the synchronous experience familiar from earlier PySNMP versions. Native sync HLAPI wrappers were deprecated in recent releases in favor of asyncio
.
Synchronous Function | AsyncIO Equivalent |
---|---|
get_cmd_sync |
get_cmd |
next_cmd_sync |
next_cmd |
set_cmd_sync |
set_cmd |
bulk_cmd_sync |
bulk_cmd |
walk_cmd_sync |
walk_cmd (async-gen) |
bulk_walk_cmd_sync |
bulk_walk_cmd (async-gen) |
create_transport |
Synchronously awaits the async create() factory on UdpTransportTarget and Udp6TransportTarget |
Add-on Function | Description |
---|---|
parallel_get_sync |
Parallel get_cmd |
cluster_varbinds |
Normalize and cluster OID lists |
Using v1arch:
from pysnmp.hlapi.v1arch.asyncio import *
from pysnmp_sync_adapter import get_cmd_sync, create_transport
err, status, index, var_binds = get_cmd_sync(
SnmpDispatcher(),
CommunityData('public', mpModel=0),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
)
for name, val in var_binds:
print(f'{name} = {val}')
Using v3arch:
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter import get_cmd_sync, create_transport
err, status, index, var_binds = get_cmd_sync(
SnmpEngine(),
CommunityData('public', mpModel=0),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
)
for name, val in var_binds:
print(f'{name} = {val}')
Executes a batch of SNMP GET requests concurrently.
parallel_get_sync
is an add-on function introduced by pysnmp-sync-adapter which efficiently dispatches multiple SNMP GET operations in parallel while preserving optimal PDU packing and result ordering, allowing timeout and throttling.
def parallel_get_sync(
snmp_engine,
auth_data,
transport_target,
*pdu_args,
queries: list,
timeout: float = None,
max_parallel: int = None,
**pdu_kwargs
) -> list[tuple]:
Args:
snmp_engine – SnmpDispatcher() or SnmpEngine() instance
auth_data – CommunityData or UsmUserData
transport_target – transport returned by create_transport()
*pdu_args – positional args before var-binds (e.g. ContextData for v3arch)
queries – list of either:
• ObjectType(...) — each sent in its own PDU
• [ObjectType(...), …] — grouped into a single PDU
timeout – optional float, total seconds to wait for all requests
max_parallel=10, - optional throttle
**pdu_kwargs – extra get_cmd() keyword args (e.g. lookupMib=False)
Returns a list of (errorIndication, errorStatus, errorIndex, varBinds)
tuples in the same order as the queries
list.
Behavior:
- Single
ObjectType
entries each become one PDU and run in parallel. - Sub-lists of
ObjectType
are packed into one multi-OID PDU each. - All PDUs fire concurrently with timeout and throttle via
asyncio.gather()
. - Results are returned in order, matching
queries
. - Optional
timeout
wraps the entire gather withasyncio.wait_for()
. - Optional
max_parallel
throttles how many SNMP PDUs are fired in parallel, so the agent is not overwhelmed.
Example (v1arch):
from pysnmp.hlapi.v1arch.asyncio import (
SnmpDispatcher, CommunityData, UdpTransportTarget,
ObjectType, ObjectIdentity
)
from pysnmp_sync_adapter import create_transport, parallel_get_sync
engine = SnmpDispatcher()
community = CommunityData("public", mpModel=0)
transport = create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)
queries = [
ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")), # single-OID
[
ObjectType(ObjectIdentity("1.3.6.1.2.1.1.3.0")),
ObjectType(ObjectIdentity("1.3.6.1.2.1.1.5.0"))
], # grouped OIDs
ObjectType(ObjectIdentity("1.3.6.1.2.1.2.2.1.2.1")) # another single-OID
]
results = parallel_get_sync(
engine,
community,
transport,
queries=queries, # keyword-only argument
timeout=5,
lookupMib=False
)
for errInd, errStat, errIdx, varBinds in results:
if errInd or errStat:
print("Error:", errInd or errStat)
else:
for oid, val in varBinds:
print(f"{oid} = {val}")
Example (v3arch):
from pysnmp.hlapi.v3arch.asyncio import (
SnmpDispatcher, UsmUserData, ContextData,
UdpTransportTarget, ObjectType, ObjectIdentity
)
from pysnmp_sync_adapter import create_transport, parallel_get_sync
engine = SnmpDispatcher()
auth = UsmUserData("usr", authKey=b"abc", privKey=b"xyz", mpModel=3)
context = ContextData()
transport = create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)
results = parallel_get_sync(
engine,
auth,
transport,
context,
queries=[
ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")),
ObjectType(ObjectIdentity("1.3.6.1.2.1.1.3.0"))
],
timeout=5
)
cluster_varbinds
is an ancillary function of that normalizes and clusters queries into flat chunks of up to max_per_pdu
var-binds per PDU.
It converts a list of OIDs into ordered sublists, each representing one SNMP PDU with no more than max_per_pdu
var-binds. This enables efficient batching and parallel dispatch via parallel_get_sync
, improving throughput by reducing per-PDU overhead while preserving request order.
Depending on the use case, the performance improvement can be significant.
Example of usage:
raw_queries = [
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
[
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.2.0')),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.3.0'))
],
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.4.0'))
]
wrapped_queries = [
[ ObjectType(ObjectIdentity(x)) for x in group ]
for group in raw_queries
]
wrapped_queries = cluster_varbinds(wrapped_queries, max_per_pdu=10) # this can get relevant performance improvement
raw_results = parallel_get_sync(
engine,
auth,
transport,
queries=wrapped_queries,
max_parallel=5
)
Description:
def cluster_varbinds(
queries: Sequence[Union[ObjectType, Sequence[ObjectType]]],
max_per_pdu: int
) -> List[List[ObjectType]]:
Parameters:
queries
(Sequence[ObjectType]
orSequence[Sequence[ObjectType]]]
): A mixed list where each element is either a singleObjectType
or a list/tuple of them.max_per_pdu
(int
): Maximum number of var-binds to include in each PDU. Must be >= 1.
Returns:
List[List[ObjectType]]
: A list of flat sub-lists, each containing up tomax_per_pdu
ObjectType
instances, preserving the original order.
Usage:
from pysnmp.hlapi import ObjectType, ObjectIdentity
# Prepare a mixed sequence of queries
raw_queries = [
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
[
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.2.0')),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.3.0'))
],
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.4.0'))
]
# Chunk into PDUs of max 2 var-binds each
pdus = cluster_varbinds(raw_queries, max_per_pdu=2)
# pdus == [
# [OT('1.3.6.1.2.1.1.1.0'), OT('1.3.6.1.2.1.1.2.0')],
# [OT('1.3.6.1.2.1.1.3.0'), OT('1.3.6.1.2.1.1.4.0')]
#]
create_transport
synchronously awaits the async create()
factory on the given transport class UdpTransportTarget
or Udp6TransportTarget
and only passes timeout and retries if they are not None, returning a ready-to-use transport object.
def create_transport(
transport_cls, *addr, timeout=None, retries=None, **other_kwargs
)
Example for IPv4:
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)
Example for IPv6:
create_transport(Udp6TransportTarget, ("2001:db8::1", 161), timeout=2)
ensure_loop()
— Retrieves the current default event loop viaasyncio.get_event_loop()
, or creates and sets one if none exists. Ensures one loop is available per thread._sync_coro()
— Executes a coroutine to completion on the shared event loop, with optional timeout support viaasyncio.wait_for()
. Handles already-running loops by scheduling a future._sync_agen()
— Collects all items from an async generator (e.g.,walk_cmd
) into a list by internally awaiting it with_sync_coro()
.make_sync()
— Higher-order function that wraps PySNMP async-HLAPI coroutines into synchronous functions, propagating optionaltimeout
arguments.
By avoiding per-call event loop instantiation and by reusing transport targets, this implementation significantly reduces runtime overhead in tight polling or query loops.
pip install pysnmp-sync-adapter
This will automatically install the latest version of pysnmp
as a dependency, if it is not already present.
To ensure compatibility with the selected PySNMP architecture (v1arch
or v3arch
), make sure to import pysnmp.hlapi.v3arch.asyncio
(or v1arch
) before importing from pysnmp_sync_adapter
. For example:
from pysnmp.hlapi.v3arch.asyncio import * # Must come first (or v1arch)
from pysnmp_sync_adapter import (
get_cmd_sync,
next_cmd_sync,
set_cmd_sync,
bulk_cmd_sync,
walk_cmd_sync,
bulk_walk_cmd_sync,
create_transport
)
This ensures that the adapter binds to the appropriate internal PySNMP modules. If omitted or imported in the wrong order, pysnmp_sync_adapter
may fallback to v1arch
even when v3arch
is desired.
Alternatively, the environment variable PYSNMP_ARCH can be set to "v3arch" (or "v1arch"). Example:
import os
os.environ["PYSNMP_ARCH"] = "v3arch" # or "v1arch"
from pysnmp_sync_adapter import get_cmd_sync # etc.
This method is particularly useful in larger applications or testing scenarios where import order might be harder to control.
Note: When both "v1arch"
and "v3arch"
modules need to be used sequentially in the same program (unusual technique), it has been verified that purging the relevant modules before re-importing them should offer correct behavior:
for mod in list(sys.modules):
if mod.startswith("pysnmp.hlapi.") or mod.startswith("pysnmp_sync_adapter"):
del sys.modules[mod]
import asyncio
import platform
from pysnmp.hlapi.v1arch.asyncio import *
from pysnmp_sync_adapter import (
get_cmd_sync,
next_cmd_sync,
set_cmd_sync,
bulk_cmd_sync,
walk_cmd_sync,
bulk_walk_cmd_sync,
create_transport
)
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
community = "public"
dispatcher = SnmpDispatcher()
auth_data = CommunityData(community, mpModel=0)
print("\n--> get_cmd_sync")
error_indication, error_status, error_index, var_binds = get_cmd_sync(
dispatcher,
auth_data,
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> set_cmd_sync")
error_indication, error_status, error_index, var_binds = set_cmd_sync(
dispatcher,
auth_data,
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0), "Linux i386"),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> next_cmd_sync")
error_indication, error_status, error_index, var_binds = next_cmd_sync(
dispatcher,
auth_data,
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> bulk_cmd_sync")
error_indication, error_status, error_index, var_binds = bulk_cmd_sync(
dispatcher,
CommunityData("public"),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
0,
2,
ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> walk_cmd_sync")
objects = walk_cmd_sync(
dispatcher,
auth_data,
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
timeout=30 # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> bulk_walk_cmd_sync")
objects = bulk_walk_cmd_sync(
dispatcher,
CommunityData("public"),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
0,
25,
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
timeout=30 # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
import asyncio
import platform
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter import (
get_cmd_sync,
next_cmd_sync,
set_cmd_sync,
bulk_cmd_sync,
walk_cmd_sync,
bulk_walk_cmd_sync,
create_transport
)
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
community = "public"
engine = SnmpEngine()
print("\n--> get_cmd_sync")
error_indication, error_status, error_index, var_binds = get_cmd_sync(
engine,
CommunityData(community),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ContextData(),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> set_cmd_sync")
error_indication, error_status, error_index, var_binds = set_cmd_sync(
engine,
CommunityData(community),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ContextData(),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0), "Linux i386"),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> next_cmd_sync")
error_indication, error_status, error_index, var_binds = next_cmd_sync(
engine,
CommunityData(community),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ContextData(),
ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> bulk_cmd_sync")
error_indication, error_status, error_index, var_binds = bulk_cmd_sync(
engine,
CommunityData("public"),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ContextData(),
0,
2,
ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> walk_cmd_sync")
objects = walk_cmd_sync(
engine,
CommunityData(community),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ContextData(),
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
timeout=30 # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
print("\n--> bulk_walk_cmd_sync")
objects = bulk_walk_cmd_sync(
engine,
CommunityData("public"),
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
ContextData(),
0,
25,
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
timeout=30 # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
for name, val in var_binds:
print(name.prettyPrint(), "=", val.prettyPrint())
This adapter provides compatibility for code written against other libraries using synchronous SNMP commands. It allows legacy SNMPv1/v2c/v3 code to run unchanged for backward compatibility, while taking advantage of simplified synchronous operation.
Implements wrappers:
Function | Description |
---|---|
getCmd(...) |
Yields a single (errInd, errStat, errIdx, varBinds) from get_cmd_sync |
setCmd(...) |
Same as getCmd but for set_cmd_sync |
nextCmd(...) |
Uses next_cmd_sync |
bulkCmd(...) |
Uses bulk_cmd_sync |
walkCmd(...) |
Uses walk_cmd_sync |
bulkWalkCmd(...) |
Uses bulk_walk_cmd_sync |
Udp6TransportTarget(...) |
Legacy-compatible wrapper |
UdpTransportTarget(...) |
Legacy-compatible wrapper |
These wrappers preserve the iterator-based usage of pysnmp.hlapi
but operate using blocking, synchronous calls underneath.
Compatibility layer for code written against the Python SNMP library v5 HLAPI (https://github.com/pysnmp/pysnmp) using synchronous SNMP commands.
This library uses SnmpEngine()
and ContextData()
. It requires legacy_wrappers
with v3arch
.
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter.legacy_wrappers import UdpTransportTarget, getCmd
for errorIndication, errorStatus, errorIndex, varBinds in getCmd(
SnmpEngine(),
CommunityData('public', mpModel=0),
UdpTransportTarget(("demo.pysnmp.com", 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
):
if errorIndication:
print(errorIndication, errorStatus, errorIndex, varBinds)
elif errorStatus:
print(errorIndication, errorStatus, errorIndex, varBinds)
else:
for name, val in varBinds:
print(name, "=", val)
Compatibility layer for code written against the legacy etingof/pysnmp v5 HLAPI (https://github.com/etingof/pysnmp) using synchronous SNMP commands.
This library uses SnmpDispatcher()
and does not use ContextData()
. It requires legacy_wrappers
with v1arch
.
from pysnmp.hlapi.v1arch.asyncio import *
from pyasn1.type.univ import OctetString as OctetStringType
from pysnmp_sync_adapter.legacy_wrappers import UdpTransportTarget, getCmd
timeout = 2
retries = 2
iterator = getCmd(
SnmpDispatcher(),
CommunityData('public', mpModel=0),
UdpTransportTarget(
("demo.pysnmp.com", 161),
timeout, # optional parameter
retries # optional parameter
),
('1.3.6.1.2.1.1.1.0', None)
)
for response in iterator:
errorIndication, errorStatus, errorIndex, varBinds = response
if errorIndication:
print(errorIndication, errorStatus, errorIndex, varBinds)
elif errorStatus:
print(errorIndication, errorStatus, errorIndex, varBinds)
else:
for varBind in varBinds:
print(' = '.join([x.prettyPrint() for x in varBind]))
The adapter supports two legacy initialization forms:
UdpTransportTarget(("host", port), timeout, retries)
UdpTransportTarget(("host", port, timeout, retries))
Both forms correctly map to the underlying transport constructor, omitting timeout
and retries
if None
.
If the error indication errorIndication
is present, the returned message is a string.
Besides, if the message includes "before timeout"
, it will be augmented with " - timed out"
for compatibility matching.
Compatibility layer for code written against the cmdgen
SNMP library appearing in pysnmp.entity.rfc3413.oneliner
, using synchronous SNMP commands.
It needs v3arch
, transparently inserts the required SnmpEngine()
and
ContextData()
parameters for SNMPv3 (v3arch.asyncio
) calls, and wraps OID tuples in
ObjectType(ObjectIdentity(...))
. Legacy UDP and UDP6 transports, including timeout and retry
arguments, are preserved.
from pysnmp.hlapi.v3arch.asyncio import *
import pysnmp_sync_adapter.cmdgen_wrappers as cmdgen
cmd_gen = cmdgen.CommandGenerator()
transport = cmdgen.UdpTransportTarget(("demo.pysnmp.com", 161), timeout=5, retries=1)
oid = '1.3.6.1.2.1.1.1.0'
oid_tuple = tuple(int(part) for part in oid.split('.'))
comm_data = cmdgen.CommunityData('public', mpModel=0)
error_indication, error_status, error_index, var_binds = cmd_gen.getCmd(
comm_data,
transport,
oid_tuple
)
for name, val in var_binds:
print(f'{name} = {val}')
Other example:
from pysnmp.hlapi.v3arch.asyncio import *
import pysnmp_sync_adapter.cmdgen_wrappers as cmdgen
_oids = ('1.3.6.1.2.1.1.1.0', '1.3.6.1.2.1.1.4.0',
'1.3.6.1.2.1.1.5.0', '1.3.6.1.2.1.1.6.0')
user = 'myUser'
authKe = 'authPassword'
privKe = 'privPassword'
authProto = usmHMACSHAAuthProtocol
privProto = usmAesCfb128Protocol
cmdGen = cmdgen.CommandGenerator()
cmdGen.getCmd(
cmdgen.UsmUserData(
user, authKey=authKe, privKey=privKe,
authProtocol=authProto, privProtocol=privProto
),
cmdgen.UdpTransportTarget(("demo.pysnmp.com", 161)),
*[_ for _ in (ObjectType(ObjectIdentity(oid)) for oid in _oids)]
)
- These adapters block the calling thread until the SNMP operation completes.
- They rely on the default
asyncio
event loop obtained viaasyncio.get_event_loop()
. If no loop is set, one is created and registered. They do not create isolated loops. - Since PySNMP uses the default event loop bound to the current thread, invoking these synchronous wrappers from a thread that is already running an event loop may cause deadlocks or
RuntimeError
. To use them safely in such environments, run them from a separate thread. Hybrid usage of both native async code and sync queries via the _sync API in the same application is strongly discouraged unless carefully managed, because triggering sync queries (_sync) while the asyncio event loop is running may cause undefined behavior or deadlocks. - A timeout (in seconds) can be optionally passed to all sync wrappers; it limits the total wall-clock time of the SNMP operation using
asyncio.wait_for()
. On timeout,asyncio.TimeoutError
is raised. - The underlying transport layer’s timeouts (e.g.
UdpTransportTarget(..., timeout=2)
) still apply, and should be set appropriately to avoid low-level blocking. - These wrappers do not forcibly cancel low-level transport operations. A timeout interrupts the coroutine, but not the transport at the OS level.
This repository uses the public SNMP simulation service at demo.pysnmp.com
, provided courtesy of Lextudio. Please ensure network access to demo.pysnmp.com:161
is available when running the tests (python -m pytest
).
-
PySNMP: https://docs.lextudio.com/snmp/
-
PySNMP API Reference: https://docs.lextudio.com/pysnmp/v7.1/docs/api-reference
EUPL-1.2 License - See LICENCE for details.