device_simulator.py 1.7 KB
#!/usr/bin/env python3

import sys

#from device_controller.abstract_component.sbig.server_udp_or_tcp_sbig import get_SocketServer_UDP_TCP_sbig
from device_controller.channels.server_udp_or_tcp import get_SocketServer_UDP_TCP

HOST = "localhost"


class UnknownCommandException(Exception):
    pass

# Voir https://stackoverflow.com/questions/10085996/shutdown-socketserver-serve-forever-in-one-thread-python-application

# Abstract class
class DeviceSimulator:
    #with socketserver_type((HOST, PORT), MyUDPorTCPHandler_classic) as myserver:

    # class attributes
    myserver = None
    _protoc = None
    _gen2nat_cmds = None

    @classmethod
    def set_protoc_and_cmds(cls, protoc, gen2nat_cmds, DEBUG=False):
        cls._protoc = protoc
        cls._gen2nat_cmds = gen2nat_cmds
        print("****** (SIM) my cmds are:", cls._gen2nat_cmds, "******")

    @classmethod
    def serve_forever(cls, PORT, func_server = get_SocketServer_UDP_TCP):
        #with get_SocketServer_UDP_TCP(HOST, PORT, "UDP") as myserver: myserver.serve_forever()
        cls.myserver = func_server(HOST, PORT, "UDP")
        print("******** myserver START: *********", cls.myserver)
        try:
            # Handle requests in an infinite loop. Runs until the loop is broken with an exception
            cls.myserver.serve_forever()
        except KeyboardInterrupt:
            pass
        print("******** myserver SHUTDOWN: *********", cls.myserver)
        cls.myserver.server_close()
        cls.myserver.shutdown()

    @classmethod
    def stop(cls):
        print("******** myserver: *********", cls.myserver)
        cls.myserver.shutdown()


'''
def getp(): return DeviceSimulator._protoc
def getc(): return DeviceSimulator._gen2nat_cmds
'''