智能电表

智能电表

    智能电表解决方案基于 QuecPython_EG91X系列C1-P02开发板实现,具有以下特性:

    • 包含了串口通信模块、TCP 客户端、RFC1662 协议、DLMS 协议(开发中)等电表行业常用功能组件。
    • 拥有基础的应用框架,用户可基于该框架完善应用程序开发。
    • 使用 Python 语言,便于二次开发。
    • 开发资源
    • 快速上手
    • 软件设计

    开发板资料

    模组资料

    配件资料

    开发工具

    • QPYcom - QuecPython 调试工具

      • 版本:V3.6.0

    • VSCode - 代码编辑器

    固件包

    • EG912U固件包
      • 版本:QPY_OCPU_BETA0002_EG912U_GLAA_FW
      • 下载固件包

    实验源码

    • 版本:v1.0.0

    • 从 Github 仓库获取源代码步骤如下:

      git clone https://github.com/QuecPython/solution-electricity-meter
      cd solution-electricity-meter
      git checkout v1.0.0
      
    • 压缩包下载

    硬件准备

    环境搭建

    硬件连接

    按照下图进行硬件连接:

     

    1. 插入SIM卡
    2. 将天线连接至标识有 LTE 字样的天线连接座上。
    3. 连接EG912U开发板的串口和CP2102模块。
    4. 连接CP2102模块和电脑
    5. 使用Type-C数据线连接EG912U开发板

    设备开发

    开机

    完成硬件连接后,长按开发板上标识为PWk的按键,直到网络灯net闪烁,或电脑设备管理器的端口列表中出现包含Quectel USB 字样的 COM 口,表示开机成功。(EG91X系列C1-P02开发板上电自动开机)

    烧录固件包

    参考此章节烧录固件包 QPY_OCPU_BETA0002_EG912U_GLAA_FW.zip 至EG912U模组开发板。

    脚本导入与运行

    1. 配置dev.json中使用的TCP服务器和UART的具体参数

    1. 参考此章节,将源码目录下 code文件夹中的所有文件导入到模组文件系统,如下图所示:

    1. 参考此章节,执行主程序文件demo.py

    2. 参考此章节,停止程序运行

    业务调试

    程序启动

    执行demo.py之后,程序会启动各个服务模块,并且连接上配置的TCP服务器

    电表上传数据

    使用串口助手发送信息模拟电表上传数据,串口助手发送16进制的RFC1662报文,内容为“Hello, I am Meter!”

    🚩 Warning
    发送的信息必须符合代码中RFC1662协议规定的报文格式

    例如: 需要发送“Hello I am Meter!”,将其组装成报文格式后16进制表示为:

    7E FF 03 00 21 00 12 BF 48 65 6C 6C 6F 20 49 20 61 6D 20 4D 65 74 65 72 21 9C 1F 7E

    再通过串口助手以16进制的方式发送给模组

    模组收到电表发来的RFC1662报文后会先进行解析,再发送给TCP服务器:

    TCP服务器收到电表发来的信息:

    TCP下行数据

    TCP服务器向模组发送信息“Hello,I am TCP_Server!”,模组收到后会先对信息进行RFC1662协议组包,再发送给电表

    🚩 Warning

    图中信息前后显示为乱码,这并非乱码,而是因为模组发过来的是一个完整的 RFC1662 协议数据包。串口助手不具备解包能力,因此属于正常现象。

    软件框架

    软件设计图

    UML 类图

    业务系统启动流程

    1. 实例化应用对象
    2. 导入配置 JSON 文件
    3. 初始化各应用拓展组件(此步骤会将各个应用拓展注册进主应用对象中,方便各拓展之间通信)
    4. 检测网路(此步骤会阻塞等待网络就绪,若等待超时则尝试 CFUN 切换以恢复网络)
    5. 加载应用拓展,并启动相关服务(用户可自定义实现)
    6. 系统进入正常运行状态(默认开启 SIM 卡和网络检测,若出现掉网情况,会自行尝试 CFUN 切换以恢复网络)

    具体实现

    组件交互时序图

    UartRFC1662ProtocolRFC1662ProtocolResolverClientrequest from meterbuild messagehandle businessbuild response messageresponse to metertcp data post throughUartRFC1662ProtocolRFC1662ProtocolResolverClientextensions communication process

    代码目录

    智能电表解决方案的代码托管于 github,代码的目录结构如下:

    .
    |-- LICENSE
    |-- README.md
    |-- code
    |   |-- business.py
    |   |-- constant.py
    |   |-- demo.py
    |   |-- dev.json
    |   |-- protocol.py
    |   `-- qframe
    |       |-- __init__.py
    |       |-- builtins
    |       |   |-- __init__.py
    |       |   |-- clients.py
    |       |   |-- network.py
    |       |   `-- uart.py
    |       |-- collections.py
    |       |-- core.py
    |       |-- datetime.py
    |       |-- globals.py
    |       |-- led.py
    |       |-- logging.py
    |       |-- ota.py
    |       |-- qsocket.py
    |       |-- serial.py
    |       `-- threading.py
    `-- docs
        `-- media
            |-- UML.png
            |-- init.png
            |-- system.png
            `-- ...
    

    基础框架

    智能电表解决方案基于 QFrame 应用框架开发。

    QFrame 应用框架是 QuecPython 开发的一个基础应用框架。点此查看该框架的设计和应用指导。

    一个应用程序往往会依赖多个业务模块,各业务模块之间可能存在耦合现象。
    在框架设计中,业务模块之间通信是采用星型结构设计,如下图所示:

    图中的 Mediator是一个中介对象(通常命名为 Application),各个业务模块之间通过 Application 对象通信,这种设计称为中介模式

    业务模块以应用拓展的形式安插在应用程序中,而各应用拓展之间的交互通过 Application 对象进行统一调度。

    应用对象和拓展

    基于 QFrame 框架的应用程序必须有一个调度各业务模块的的中心对象,即上文提到的 Application 对象;应用参数也是通过该对象配置。

    示例代码如下:

    def create_app(name='DTU', config_path='/usr/dev.json'):
        # init application instance
        _app = Application(name)
        # read settings from json file
        _app.config.from_json(config_path)
        #The code is omitted here
        return _app
    
    app = create_app()
    

    Application类原型:

    class Application(object):
        """Application Class"""
    
        def __init__(self, name):
            self.name = name
            self.config = LocalStorage()
            self.business_threads_pool = ThreadPoolExecutor(max_workers=4, enable_priority=True)
            self.submit = self.business_threads_pool.submit
            # init builtins dictionary and init common, we use OrderedDict to keep loading ordering
            self.extensions = OrderedDict()
            self.__append_builtin_extensions()
            # set global context variable
            CurrentApp.set(self)
            G.set(_AppCtxGlobals())
    
        #The code is omitted here
    
        def append_extension(self, extension):
            self.extensions[extension.name] = extension
    
        def mainloop(self):
            """load builtins"""
            for extension in self.extensions.values():
                if hasattr(extension, 'load'):
                    extension.load()
    

    应用拓展的定义与初始化

    应用拓展指的是被 Application 对象加载的业务模块。一般来说,应用拓展从 app.config 获取其自身的配置并在初始化时传递给应用实例。

    应用拓展的使用包含定义和初始化两部分。应用拓展提供一个名为 AppExtensionABC 的基类,定义如下:

    class AppExtensionABC(object):
        """Abstract Application Extension Class"""
    
        def __init__(self, name, app=None):
            self.name = name  # extension name
            if app:
                self.init_app(app)
    
        def init_app(self, app):
            # register into app, then, you can use `app.{extesion.name}` to get current extension instance
            app.append_extesion(self)
            raise NotImplementedError
    
        def load(self):
            # loading extension functions, this method will be called in `app.mainloop`
            raise NotImplementedError
    

    该基类被具体的应用拓展类继承,用来约束应用拓展类的接口定义。

    • 我们需要向初始化方法 __init__ 传入 Application 应用程序对象。在创建应用拓展对象时调用 init_app 来完成拓展的初始化动作;亦可不传入应用对象,而直接创建应用拓展对象,后面再显性调用 init_app 来完成初始化。
    • load 方法用来被 Application 对象调用,用于加载各应用拓展。

    应用拓展的使用

    当应用拓展继承基类 AppExtensionABC,实现了必要的接口功能以后,可参照下面两种不同方式的代码,加载应用拓展对象。

    方式一:

    app = Application(__name__)
    ext = ExtensionClass(app)
    

    方式二(如uart,client等拓展对象):

    ext = ExtensionClass()
    ext.init_app(app)
    
        rfc1662resolver.init_app(_app)
        uart.init_app(_app)
        client.init_app(_app)
    

    主应用程序

    demo.py 作为应用程序入口的脚本文件,它提供工厂函数 create_app,向该函数传入配置路径来初始化应用程序和加载各应用拓展。

    demo.py 示例代码如下:

    import checkNet
    from usr.qframe import Application
    from usr.business import rfc1662resolver, client, uart
    
    PROJECT_NAME = "QuecPython_Framework_DEMO"
    PROJECT_VERSION = "1.0.0"
    def poweron_print_once():
        checknet = checkNet.CheckNetwork(
            PROJECT_NAME,
            PROJECT_VERSION,
        )
        checknet.poweron_print_once()
    def create_app(name='DTU', config_path='/code/dev.json'):
      # initialize Application
      _app = Application(name)
      # read settings from json file
      _app.config.from_json(config_path)
    
      # init rfc1662resolver extension
      rfc1662resolver.init_app(_app)
      # init uart extension
      uart.init_app(_app)
      # init tcp client extension
      client.init_app(_app)
    
      return _app
    
    # create app with `create_app` factory function
    app = create_app()
    if __name__ == '__main__':
      poweron_print_once()
      # loading all extensions
      app.mainloop()
    

    应用拓展类

    主要应用拓展功能有三大类 rfc1662resolver(1662协议解析)、client(TCP 客户端)和 uart(串口读写),它们均被注册进应用对象 Application 中,方便互相协作。

    • rfc1662resolver:负责解析和组装 RFC1662 协议报文,(RFC1662ProtocolResolver 实例对象)。
    • client:TCP 客户端(BusinessClient 实例对象),负责与 TCP 服务器通信。
    • uart:串口客户端(UartBusiness 实例对象),负责串口读写。

     RFC1662ProtocolResolver

    该类是一个应用拓展类,是 RFC1662 协议数据的解析器,用于处理业务中传输的 RFC1662 协议数据,对该类数据进行解包组包的功能。

    该类提供如下方法:

    • resolve(msg)
      • 功能:处理一个 RFC1662 协议消息。行为是通过解析消息的 protocol(可以理解为协议报文的id),根据该 protocol 从注册表中查找该消息的处理函数,如果找到则调用函数处理,否则抛出 ValueError 异常。如何注册处理函数见装饰器函数 register
      • 参数msg 是一个 RFC1662Protocol 对象,该类是 RFC1662 协议的封装类,见后续介绍。
      • 返回值:None
      • 异常:若传入 msg 未能在注册表中查找到处理函数,则抛出 ValueError 异常。
    • register(protocol)
      • 功能:是一个装饰器函数,用于注册一个 protocol 的处理函数。
      • 参数protocol 可以理解为 RFC1662 协议的报文 id。
      • 返回值:原函数
    • tcp_to_meter_packet(data)
      • 功能:静态方法,将字节数据 data,组包成透传 RFC1662 数据包(0x2100),即 TCP 透传给表计的数据帧。
      • 参数data,字节类型。
      • 返回值:0x2100 协议包字节串
      • 异常:无
    • module_to_meter_packet(data)
      • 功能:静态方法,组 RFC1662 协议数据包(0x2200),即模块主动发向表计的数据帧
      • 参数:data 是一个列表,[get/set, id, data],其中:
        • get/setCOSEM.GET/COSEM.SET,分别对应值为 0xC0/0xC1
        • id:功能命令字
        • data:字节类型
      • 返回值:0x2200 协议包字节串

    示例代码如下:

    # we have inited a RFC1662ProtocolResolver object in `business.py` module
    # import `rfc1662resolver`
    from code.business import rfc1662resolver
    
    
    # decorate with protocol 0x2100
    @rfc1662resolver.register(0x2100)
    def handle2100(msg):
      """when get a 0x2100 message,this function will be called"""
      pass
    

     RFC1662Protocol

    该类是 RFC1662 协议的具体实现,包括解包和组包。该类对象是一个完整 RFC1662 协议包的封装形式。主要方法有:

    • build_rfc_0x2100:组 0x2100 协议包,返回字节,等同于 RFC1662ProtocolResolver.tcp_to_meter_packet
    • build_rfc_0x2200:组 0x2200 协议包,返回字节,等同于 RFC1662ProtocolResolver.module_to_meter_packet
    • build:类方法,用于解析一帧协议包,返回 RFC1662Protocol 对象。
    • replay_get:应答 get 指令, 判断成功失败
    • replay_set:应答 set 指令
    • reply_event:应答 event 信息

    TCP 客户端组件

    基类 TcpClient

    该类向用户开放了两个接口:

    • recv_callback 方法,用户通过重写该方法,实现对 TCP 服务器下行数据的业务处理。
    • send 方法,用户可调用该方法发送数据至服务器。

    代码如下:

    class TcpClient(object):
        # ...
        def recv_callback(self, data):
            raise NotImplementedError('you must implement this method to handle data received by tcp.')
    
        def send(self, data):
            # TODO: uplink data method
            pass
    
    子类 BusinessClient

    BusinessClient 通过重写 recv_callback 方法,实现将服务器下行数据打包为 RFC1662 格式的报文,并将数据转发至串口。

    代码如下:

    class BusinessClient(TcpClient):
    
        def recv_callback(self, data):
            # recv tcp data and send to uart
            data = RFC1662Protocol.build_rfc_0x2100(data)
            CurrentApp().uart.write(data)
    

    串口通信组件

    基类 Uart

    该类向用户开放了两个接口:

    • recv_callback 方法,用户通过重写该方法,实现对接收到的串口数据的业务处理。
    • send 方法,用户可调用该方法向串口发送数据。

    代码如下:

    class Uart(object):
        # ...
        def recv_callback(self, data):
            raise NotImplementedError('you must implement this method to handle data received from device.')
    
        def write(self, data):
            # TODO: write data to uart
            pass
    
    子类 UartBusiness

    UartBusiness 通过重写 recv_callback 方法,实现对接收到的串口数据的业务处理。

    class UartBusiness(Uart):
    
            def recv_callback(self, data):
                # parse 1662 protocol data
                pass
    

    在子类 UartBusiness  recv_callback 方法中解析 RFC1662 协议报文,构建消息对象后,通过rfc1662resolver.resolve 方法分发消息处理业务。

    编写业务程序

    在脚本文件 business.py 中定义一个全局的 rfc1662resolver 解析器,用于注册指定类型的消息处理函数。

    如下示例代码注册 0x2100 协议透传处理函数:

    # >>>>>>>>>> handle rfc1662 message received from uart <<<<<<<<<<
    @rfc1662resolver.register(0x2100)
    def handle2100(msg):
      """post data received to cloud"""
      # message body bytes
      data = msg.info().request_data()
      if data:
        # post data to tcp server by `client` extension register in Application
        CurrentApp().client.send(data)

    沟通无界,服务无限

    了解更多关于移远公司、产品和技术支持的信息。