From 53f5aee7d2f5096294d57b69bd1060ff1e110c63 Mon Sep 17 00:00:00 2001 From: Martin Teichmann Date: Tue, 14 May 2019 17:14:29 +0200 Subject: [PATCH 1/2] Add asyncio support --- vxi11/rpc.py | 99 +++++++-- vxi11/vxi11.py | 562 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 645 insertions(+), 16 deletions(-) diff --git a/vxi11/rpc.py b/vxi11/rpc.py index 9d889a6..d530775 100644 --- a/vxi11/rpc.py +++ b/vxi11/rpc.py @@ -23,6 +23,7 @@ import socket import os import struct +import asyncio RPCVERSION = 2 @@ -281,6 +282,84 @@ def do_call(self): # xid larger than expected - packet from the future? raise RPCError('wrong xid in reply %r instead of %r' % (xid, self.lastxid)) +class AsyncTCPClient(Client): + async def connect(self): + if self.port == 0: + pmap = AsyncTCPPortMapperClient(self.host) + await pmap.connect() + port = await pmap.get_port((self.prog, self.vers, IPPROTO_TCP, 0)) + pmap.close() + else: + port = self.port + if port == 0: + raise RPCError('program not registered') + + self.reader, self.writer = await asyncio.open_connection( + self.host, port) + + def close(self): + self.writer.close() + + def sendfrag(self, frag, last=True): + x = len(frag) + + if last: + x = x | 0x80000000 + header = struct.pack(">I", x) + self.writer.write(header) + self.writer.write(frag) + + async def recvfrag(self): + header = await self.reader.readexactly(4) + x, = struct.unpack(">I", header) + last = ((x & 0x80000000) != 0) + n = int(x & 0x7fffffff) + frag = await self.reader.readexactly(n) + return last, frag + + async def recvrecord(self): + ret = [] + last = 0 + while not last: + last, frag = await self.recvfrag() + ret.append(frag) + return b''.join(ret) + + async def do_call(self): + call = self.packer.get_buf() + self.sendfrag(call) + + while True: + reply = await self.recvrecord() + u = self.unpacker + u.reset(reply) + xid, verf = u.unpack_replyheader() + if xid == self.lastxid: + # xid matches, we're done + return + elif xid < self.lastxid: + # Stale data in buffer due to interruption + # Discard and fetch another record + continue + else: + # xid larger than expected - packet from the future? + raise RPCError('wrong xid in reply %r instead of %r' % (xid, self.lastxid)) + + async def make_call(self, proc, args, pack_func, unpack_func): + # Don't normally override this (but see Broadcast) + if pack_func is None and args is not None: + raise TypeError('non-null args with null pack_func') + self.start_call(proc) + if pack_func: + pack_func(args) + await self.do_call() + if unpack_func: + result = unpack_func() + else: + result = None + self.unpacker.done() + return result + # Client using UDP to a specific port @@ -456,7 +535,8 @@ def unpack_call_result(self): class PartialPortMapperClient: - def __init__(self): + def __init__(self, host): + super(PartialPortMapperClient, self).__init__(host, PMAP_PROG, PMAP_VERS, PMAP_PORT) self.packer = PortMapperPacker() self.unpacker = PortMapperUnpacker('') @@ -487,24 +567,19 @@ def callit(self, ca): class TCPPortMapperClient(PartialPortMapperClient, RawTCPClient): - - def __init__(self, host): - RawTCPClient.__init__(self, host, PMAP_PROG, PMAP_VERS, PMAP_PORT) - PartialPortMapperClient.__init__(self) + pass class UDPPortMapperClient(PartialPortMapperClient, RawUDPClient): - - def __init__(self, host): - RawUDPClient.__init__(self, host, PMAP_PROG, PMAP_VERS, PMAP_PORT) - PartialPortMapperClient.__init__(self) + pass class BroadcastUDPPortMapperClient(PartialPortMapperClient, RawBroadcastUDPClient): + pass + - def __init__(self, bcastaddr): - RawBroadcastUDPClient.__init__(self, bcastaddr, PMAP_PROG, PMAP_VERS, PMAP_PORT) - PartialPortMapperClient.__init__(self) +class AsyncTCPPortMapperClient(PartialPortMapperClient, AsyncTCPClient): + pass # Generic clients that find their server through the Port mapper diff --git a/vxi11/vxi11.py b/vxi11/vxi11.py index 231582d..63fb543 100644 --- a/vxi11/vxi11.py +++ b/vxi11/vxi11.py @@ -29,6 +29,7 @@ import re import struct import time +import asyncio # VXI-11 RPC constants @@ -392,11 +393,12 @@ def done(self): pass -class CoreClient(rpc.TCPClient): +class CoreClientMixin: def __init__(self, host, port=0): self.packer = Packer() self.unpacker = Unpacker('') - rpc.TCPClient.__init__(self, host, DEVICE_CORE_PROG, DEVICE_CORE_VERS, port) + super(CoreClientMixin, self).__init__(host, DEVICE_CORE_PROG, + DEVICE_CORE_VERS, port) def create_link(self, id, lock_device, lock_timeout, name): params = (id, lock_device, lock_timeout, name) @@ -486,11 +488,20 @@ def destroy_intr_chan(self): self.unpacker.unpack_device_error) -class AbortClient(rpc.TCPClient): +class CoreClient(CoreClientMixin, rpc.TCPClient): + pass + + +class AsyncCoreClient(CoreClientMixin, rpc.AsyncTCPClient): + pass + + +class AbortClientMixin(rpc.TCPClient): def __init__(self, host, port=0): self.packer = Packer() self.unpacker = Unpacker('') - rpc.TCPClient.__init__(self, host, DEVICE_ASYNC_PROG, DEVICE_ASYNC_VERS, port) + super(AbortClientMixin, self).__init__(host, DEVICE_ASYNC_PROG, + DEVICE_ASYNC_VERS, port) def device_abort(self, link): return self.make_call(DEVICE_ABORT, link, @@ -498,6 +509,14 @@ def device_abort(self, link): self.unpacker.unpack_device_error) +class AbortClient(AbortClientMixin, rpc.TCPClient): + pass + + +class AsyncAbortClient(AbortClientMixin, rpc.AsyncTCPClient): + pass + + def list_devices(ip=None, timeout=1): "Detect VXI-11 devices on network" @@ -835,6 +854,494 @@ def unlock(self): self.locked = False +class AsyncDevice(Device): + "VXI-11 device interface client" + + async def open(self): + "Open connection to VXI-11 device" + assert self.link is None + + if self.client is None: + self.client = AsyncCoreClient(self.host) + await self.client.connect() + + error, link, abort_port, max_recv_size = await self.client.create_link( + self.client_id, + 0, + self._lock_timeout_ms, + self.name.encode("utf-8") + ) + + if error: + raise Vxi11Exception(error, 'open') + + self.abort_port = abort_port + + self.link = link + self.max_recv_size = min(max_recv_size, 1024*1024) + + __aenter__ = open + + async def close(self): + "Close connection" + assert self.link is not None + + await self.client.destroy_link(self.link) + self.client.close() + self.link = None + self.client = None + + async def __aexit__(self, etype, exc, tb): + await self.close() + + async def abort(self): + "Asynchronous abort" + assert self.link is not None + + if self.abort_client is None: + self.abort_client = AbortClient(self.host, self.abort_port) + + error = await self.abort_client.device_abort(self.link) + + if error: + raise Vxi11Exception(error, 'abort') + + async def write_raw(self, data): + "Write binary data to instrument" + assert self.link is not None + + if self.term_char is not None: + flags = OP_FLAG_TERMCHAR_SET + term_char = str(self.term_char).encode('utf-8')[0] + data += term_char + + flags = 0 + + num = len(data) + + offset = 0 + + while num > 0: + if num <= self.max_recv_size: + flags |= OP_FLAG_END + + block = data[offset:offset+self.max_recv_size] + + error, size = await self.client.device_write( + self.link, + self._timeout_ms, + self._lock_timeout_ms, + flags, + block + ) + + if error: + raise Vxi11Exception(error, 'write') + elif size < len(block): + raise Vxi11Exception("did not write complete block", 'write') + + offset += size + num -= size + + async def read_raw(self, num=-1): + "Read binary data from instrument" + assert self.link is not None + + read_len = self.max_read_len + if num > 0: + read_len = min(num, self.max_read_len) + + flags = 0 + reason = 0 + + term_char = 0 + + if self.term_char is not None: + flags = OP_FLAG_TERMCHAR_SET + term_char = str(self.term_char).encode('utf-8')[0] + + read_data = [] + + while reason & (RX_END | RX_CHR) == 0: + error, reason, data = await self.client.device_read( + self.link, + read_len, + self._timeout_ms, + self._lock_timeout_ms, + flags, + term_char + ) + + if error: + raise Vxi11Exception(error, 'read') + + read_data.append(data) + + if num > 0: + num = num - len(data) + if num <= 0: + break + if num < read_len: + read_len = num + + return b''.join(read_data) + + async def ask_raw(self, data, num=-1): + "Write then read binary data" + await self.write_raw(data) + return (await self.read_raw(num)) + + async def write(self, message, encoding='utf-8'): + "Write string to instrument" + if isinstance(message, (tuple, list)): + for message_i in message: + await self.write(message_i, encoding) + return + + await self.write_raw(str(message).encode(encoding)) + + async def read(self, num=-1, encoding='utf-8'): + "Read string from instrument" + return (await self.read_raw(num)).decode(encoding).rstrip('\r\n') + + async def ask(self, message, num=-1, encoding = 'utf-8'): + "Write then read string" + if isinstance(message, (tuple, list)): + val = [] + for message_i in message: + val.append(await self.ask(message_i, num, encoding)) + return val + + await self.write(message, encoding) + return (await self.read(num, encoding)) + + async def trigger(self): + "Send trigger command" + flags = 0 + + error = await self.client.device_trigger( + self.link, + flags, + self._lock_timeout_ms, + self._timeout_ms + ) + + if error: + raise Vxi11Exception(error, 'trigger') + + async def clear(self): + "Send clear command" + flags = 0 + + error = await self.client.device_clear( + self.link, + flags, + self._lock_timeout_ms, + self._timeout_ms + ) + + if error: + raise Vxi11Exception(error, 'clear') + + async def lock(self): + "Send lock command" + flags = 0 + + error = await self.client.device_lock( + self.link, + flags, + self._lock_timeout_ms + ) + + if error: + raise Vxi11Exception(error, 'lock') + + self.locked = True + + async def unlock(self): + "Send unlock command" + flags = 0 + + error = await self.client.device_unlock(self.link) + + if error: + raise Vxi11Exception(error, 'unlock') + + self.locked = False + + +class AsyncInterfaceDevice(AsyncDevice): + "VXI-11 IEEE 488.1 interface device interface client" + + async def open(self): + "Open connection to VXI-11 device" + assert self.link is None + + if ',' in self.name: + raise Vxi11Exception("Cannot specify address for InterfaceDevice") + + await super(InterfaceDevice, self).open() + + self._bus_address = self.get_bus_address() + + async def send_command(self, data): + "Send command" + flags = 0 + + error, data_out = await self.client.device_docmd( + self.link, + flags, + self._timeout_ms, + self._lock_timeout_ms, + CMD_SEND_COMMAND, + True, + 1, + data + ) + + if error: + raise Vxi11Exception(error, 'send_command') + + return data_out + + def create_setup(self, address_list): + data = [self._bus_address | GPIB_CMD_TAD, GPIB_CMD_UNL] + + if type(address_list) is int: + address_list = [address_list] + + for addr in address_list: + if isinstance(addr, tuple): + if addr[0] < 0 or addr[0] > 30: + raise Vxi11Exception("Invalid address", 'create_setup') + data.append(addr[0] | GPIB_CMD_LAD) + if len(addr) > 1: + if addr[1] < 0 or addr[1] > 30: + raise Vxi11Exception("Invalid address", 'create_setup') + data.append(addr[1] | GPIB_CMD_SAD) + else: + if addr < 0 or addr > 30: + raise Vxi11Exception("Invalid address", 'create_setup') + data.append(addr | GPIB_CMD_LAD) + + return bytes(data) + + async def send_setup(self, address_list): + "Send setup" + return (await self.send_command(self.create_setup(address_list))) + + async def _bus_status(self, val): + "Bus status" + flags = 0 + + error, data_out = await self.client.device_docmd( + self.link, + flags, + self._timeout_ms, + self._lock_timeout_ms, + CMD_BUS_STATUS, + True, + 2, + struct.pack('!H', val) + ) + + if error: + raise Vxi11Exception(error, 'bus_status') + + return struct.unpack('!H', data_out)[0] + + async def test_ren(self): + "Read REN line" + return (await self._bus_status(CMD_BUS_STATUS_REMOTE)) + + async def test_srq(self): + "Read SRQ line" + return (await self._bus_status(CMD_BUS_STATUS_SRQ)) + + async def test_ndac(self): + "Read NDAC line" + return (await self._bus_status(CMD_BUS_STATUS_NDAC)) + + async def is_system_controller(self): + "Check if interface device is a system controller" + return await self._bus_status(CMD_BUS_STATUS_SYSTEM_CONTROLLER) + + async def is_controller_in_charge(self): + "Check if interface device is the controller-in-charge" + return await self._bus_status(CMD_BUS_STATUS_CONTROLLER_IN_CHARGE) + + async def is_talker(self): + "Check if interface device is addressed as a talker" + return await self._bus_status(CMD_BUS_STATUS_TALKER) + + async def is_listener(self): + "Check if interface device is addressed as a listener" + return await self._bus_status(CMD_BUS_STATUS_LISTENER) + + async def get_bus_address(self): + "Get interface device bus address" + return await self._bus_status(CMD_BUS_STATUS_BUS_ADDRESS) + + async def set_atn(self, val): + "Set ATN line" + flags = 0 + + error, data_out = await self.client.device_docmd( + self.link, + flags, + self._timeout_ms, + self._lock_timeout_ms, + CMD_ATN_CTRL, + True, + 2, + struct.pack('!H', val) + ) + + if error: + raise Vxi11Exception(error, 'set_atn') + + return struct.unpack('!H', data_out)[0] + + async def set_ren(self, val): + "Set REN line" + flags = 0 + + error, data_out = await self.client.device_docmd( + self.link, + flags, + self._timeout_ms, + self._lock_timeout_ms, + CMD_REN_CTRL, + True, + 2, + struct.pack('!H', val) + ) + + if error: + raise Vxi11Exception(error, 'set_ren') + + return struct.unpack('!H', data_out)[0] + + async def pass_control(self, addr): + "Pass control to another controller" + + if addr < 0 or addr > 30: + raise Vxi11Exception("Invalid address", 'pass_control') + + flags = 0 + + error, data_out = await self.client.device_docmd( + self.link, + flags, + self._timeout_ms, + self._lock_timeout_ms, + CMD_PASS_CTRL, + True, + 4, + struct.pack('!L', addr) + ) + + if error: + raise Vxi11Exception(error, 'pass_control') + + return struct.unpack('!L', data_out)[0] + + async def set_bus_address(self, addr): + "Set interface device bus address" + + if addr < 0 or addr > 30: + raise Vxi11Exception("Invalid address", 'set_bus_address') + + flags = 0 + + error, data_out = await self.client.device_docmd( + self.link, + flags, + self._timeout_ms, + self._lock_timeout_ms, + CMD_BUS_ADDRESS, + True, + 4, + struct.pack('!L', addr) + ) + + if error: + raise Vxi11Exception(error, 'set_bus_address') + + self._bus_address = addr + + return struct.unpack('!L', data_out)[0] + + async def send_ifc(self): + "Send IFC" + flags = 0 + + error, data_out = await self.client.device_docmd( + self.link, + flags, + self._timeout_ms, + self._lock_timeout_ms, + CMD_IFC_CTRL, + True, + 1, + b'' + ) + + if error: + raise Vxi11Exception(error, 'send_ifc') + + async def find_listeners(self, address_list=None): + "Find devices" + if address_list is None: + address_list = list(range(31)) + address_list.remove(self._bus_address) + + found = [] + + await self.lock() + try: + for addr in address_list: + # check for listener at primary address + cmd = [GPIB_CMD_UNL, GPIB_CMD_UNT] + cmd.append(self._bus_address | GPIB_CMD_TAD) # spec says this is unnecessary, but doesn't appear to work without this + if type(addr) is tuple: + addr = addr[0] + if addr < 0 or addr > 30: + raise Vxi11Exception("Invalid address", 'find_listeners') + cmd.append(addr | GPIB_CMD_LAD) + await self.send_command(bytes(cmd)) + await self.set_atn(False) + await asyncio.sleep(0.0015) # probably not necessary due to network delays + if await self.test_ndac(): + found.append(addr) + else: + # check for listener at any sub-address + cmd = [GPIB_CMD_UNL, GPIB_CMD_UNT] + cmd.append(self._bus_address | GPIB_CMD_TAD) # spec says this is unnecessary, but doesn't appear to work without this + cmd.append(addr | GPIB_CMD_LAD) + for sa in range(31): + cmd.append(sa | GPIB_CMD_SAD) + await self.send_command(bytes(cmd)) + await self.set_atn(False) + await asyncio.sleep(0.0015) # probably not necessary due to network delays + if await self.test_ndac(): + # find specific sub-address + for sa in range(31): + cmd = [GPIB_CMD_UNL, GPIB_CMD_UNT] + cmd.append(self._bus_address | GPIB_CMD_TAD) # spec says this is unnecessary, but doesn't appear to work without this + cmd.append(addr | GPIB_CMD_LAD) + cmd.append(sa | GPIB_CMD_SAD) + await self.send_command(bytes(cmd)) + await self.set_atn(False) + await asyncio.sleep(0.0015) # probably not necessary due to network delays + if self.test_ndac(): + found.append((addr, sa)) + finally: + await self.unlock() + + return found + + class InterfaceDevice(Device): "VXI-11 IEEE 488.1 interface device interface client" def __init__(self, host, name = None, client_id = None, term_char = None): @@ -1208,3 +1715,50 @@ def local(self): if error: raise Vxi11Exception(error, 'local') +class AsyncInstrument(AsyncDevice): + "VXI-11 instrument interface client" + + async def read_stb(self): + "Read status byte" + flags = 0 + + error, stb = await self.client.device_read_stb( + self.link, + flags, + self._lock_timeout_ms, + self._timeout_ms + ) + + if error: + raise Vxi11Exception(error, 'read_stb') + + return stb + + async def remote(self): + "Send remote command" + flags = 0 + + error = await self.client.device_remote( + self.link, + flags, + self._lock_timeout_ms, + self._timeout_ms + ) + + if error: + raise Vxi11Exception(error, 'remote') + + async def local(self): + "Send local command" + flags = 0 + + error = await self.client.device_local( + self.link, + flags, + self._lock_timeout_ms, + self._timeout_ms + ) + + if error: + raise Vxi11Exception(error, 'local') + From 070abb2f9d00c8c198c324f68b9964bb38a09dfd Mon Sep 17 00:00:00 2001 From: Cyril Danilevski Date: Fri, 31 May 2019 15:06:53 +0200 Subject: [PATCH 2/2] Add asyncio example --- README.md | 28 ++++++++++++++++++++++++++++ doc/examples.rst | 30 ++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/README.md b/README.md index b29063d..56d289e 100644 --- a/README.md +++ b/README.md @@ -68,3 +68,31 @@ and: print(instr.ask("*IDN?")) # returns 'AGILENT TECHNOLOGIES,MSO-X 3014A,MY********,02.35.2013061800' + +## Python 3 asyncio usage example + + from asyncio import get_event_loop + from vxi11 import vxi11 + + + instr = vxi11.AsyncInstrument("sqs-ilh-las-osz_dpo71254c") + + + async def acquire(): + await instr.open() + print(await instr.ask("*IDN?")) + # In our case, prints 'TEKTRONIX,DPO71254C,C500158,CF:91.1CT FV:10.5.1 Build 24' + + cmd = "DAT:SOU " + ",".join(f"CH{x}" for x in range(1, 5)) + await instr.write(cmd) + + await instr.write("WAVFRMS?") + + readout = await instr.read_raw(-1) + print(readout) + + await instr.close() + + + loop = get_event_loop() + loop.run_until_complete(acquire()) diff --git a/doc/examples.rst b/doc/examples.rst index e90dda5..23414cf 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -29,3 +29,33 @@ Open a connection and set the timeout:: >>> instr.timeout = 60*1000 >>> print(instr.ask("*TST?")) '0' + +Asyncio connections +=================== +The **AsyncInstrument** class can be used for usage with asyncio:: + + from asyncio import get_event_loop + from vxi11 import vxi11 + + + instr = vxi11.AsyncInstrument("sqs-ilh-las-osz_dpo71254c") + + + async def acquire(): + await instr.open() + print(await instr.ask("*IDN?")) + # In our case, prints 'TEKTRONIX,DPO71254C,C500158,CF:91.1CT FV:10.5.1 Build 24' + + cmd = "DAT:SOU " + ",".join(f"CH{x}" for x in range(1, 5)) + await instr.write(cmd) + + await instr.write("WAVFRMS?") + + readout = await instr.read_raw(-1) + print(readout) + + await instr.close() + + + loop = get_event_loop() + loop.run_until_complete(acquire())