基于EG912U的Smart Tracker解决方案

基于EG912U的Smart Tracker解决方案

该Smart Tracker解决方案基于QuecPython EG912U 标准开发板实现,具有以下特性:

  • 提供准确的高精度GNSS定位。
  • 强大丰富的接口。
  • 使用python语言,便于二次开发
  • 开发资源
  • 快速上手
  • 软件设计

硬件资料

  • tracker box成品

    • 购买渠道:联系移远工作人员购买

软件资料

开发工具

  • QPYcom - QuecPython 调试工具

    • 版本:V3.9.0

    • 下载 QPYcom

  • VSCode - 代码编辑器

实验源码

  • 版本:v2.0.1

  • 从 Github 仓库获取源代码步骤如下:

    git clone https://github.com/QuecPython/tracker-box
    cd tracker-box
    git checkout tracker-box-EG912U

硬件准备

环境搭建

设置您的智能追踪器

按照以下步骤为智能追踪器供电:

  1. 开机前,将 nano SIM 卡(需开通数据服务)插入 SIM 卡槽。若开机后插入 SIM 卡,需重启设备。
  2. 通过以下任一方式开机:
  • 方式 1:按下设备侧面的电源按钮。

  • 方式 2:使用随附的 USB-C 线缆将智能追踪器连接至笔记本电脑或充电宝,设备将自动开机。

3.开机后,智能追踪器将连接至蜂窝网络并链接到 Acceleronix 资产管理 SaaS 平台。

连接至资产管理 SaaS 平台

Acceleronix 开发的资产管理 SaaS 平台为特定行业提供全面解决方案。它涵盖从 TSL 模型功能定义到 SaaS 平台管理及移动应用设备控制的全业务流程。

该资产管理 SaaS 平台支持用户对设备进行操作,可基于 TSL 模型对硬件数据进行动态分析。它集成了水质、温度、湿度、光线、二氧化碳等多种传感器,以适应不同场景。功能包括实时数据查看、历史运行曲线、数据聚合,以及基于位置设备的设备地图与轨迹回放。

设备开发

开机

完成硬件连接工作后,当PWR,SCK1亮起或电脑设备管理器的端口列表出现包含Quectel USB字样的COM口,表示开机成功

烧录固件包

参考此章节,烧录固件包至开发板。

脚本导入与运行

1.参考此章节,将源码目录下的code文件夹中的所有文件按原目录结构导入到模组文件系统中,如下图所示

2.参考此章节,执行主程序文件_main.py

3.参考此章节,停止程序运行。

业务调试

程序启动

执行_main.py脚本后,程序开始运行,会打印拨号信息,包括拨号状态、IP地址、DNS服务器地址,设备号等

数据检测

开始运行后会打印启动信息

查看设备位置(LBS)

  1. 点击访问云平台,在设备列表中点击 详情,打开设备详情页。
  2. 进入 位置 标签页查看 LBS 位置数据。

注意:位置每 30 分钟更新一次。如需手动刷新,请点击应用控制面板中的刷新按钮,然后重新加载页面即可查看更新后的位置。

查看和管理传感器数据

  1. 当设备在线时,在 Wonderfree 应用中点击设备进入控制面板。

  2. 查看智能追踪器的实时传感器数据和 LBS 位置数据。

  3. 如需更新传感器数据或 LBS 位置,点击控制面板右上角的刷新按钮

软件框架

软件设计图

代码讲解

等待网络就绪

main.py的wait_network_ready类是计算最大等待次数(WAIT_NETWORK_READY_S)秒除以5秒间隔。如果网络就绪则返回True,超时则返回False。

def wait_network_ready():
    wait_cnt = WAIT_NETWORK_READY_S / 5
    is_ready = False

    while wait_cnt:        
        lte = dataCall.getInfo(1, 0)
        if lte[2][0] == 1:
            is_ready = True                        
            break

        utime.sleep(5)
        wait_cnt -= 1    

    return is_ready

创建应用实例

create_app创建应用实例的函数。主要功能包括:创建Application应用对象,设置名称和版本初始化应用配置,加载指定路径的配置文件初始化各种服务模块(QTH客户端、GNSS服务、电池服务等),返回配置好的应用实例。

def create_app(name="SimpliKit", version="1.0.0", config_path="/usr/config.json"):
    _app = Application(name, version)
    _app.config.init(config_path)

    qth_client.init_app(_app)
    lbs_service.init_app(_app)
    gnss_service.init_app(_app) 
    sensor_service.init_app(_app)

    return _app

LBS定位服务

init 方法初始化服务,可选绑定到应用实例 app

load 方法启动一个线程运行 start_update,用于周期性更新和发送LBS数据

read 方法通过 net.getCellInfo() 获取基站信息,格式化为特定字符串(如 $LBS,...

start_update 方法在事件触发时,读取LBS数据并尝试发送到服务器,失败则重试或等待。成功后等待300秒,否则每2秒重试。

put_lbs 方法实现单次发送LBS数据,成功后退出循环。

class LbsService(object):

    def __init__(self, app=None):
        self.__net = net
        if app is not None:
            self.init_app(app)

    def __str__(self):
        return '{}'.format(type(self).__name__)
    def init_app(self, app):
        self.event = app.event
        app.register('lbs_service', self)

    def load(self):
        logger.info('loading {} extension, init lbs will take some seconds'.format(self))
        Thread(target=self.start_update).start()

    def read(self):
        cell_info = net.getCellInfo()
        if cell_info != -1 and cell_info[2]:
            first_tuple = cell_info[2]
            mcc_decimal = first_tuple[0][2]  # Retrieve the decimal MCC (e.g., 1120)
            #mcc_hex = "{:x}".format(mcc_decimal).upper()  # Convert to hexadecimal (e.g., '460')

            lbs_data = "$LBS,{},{},{},{},{},0*69;".format(
                mcc_decimal,
                first_tuple[0][3],
                first_tuple[0][5],
                first_tuple[0][1],
                first_tuple[0][7]
            )
            return lbs_data

    def read(self):
        cell_info = net.getCellInfo()
        if cell_info != -1 and cell_info[2]:
            first_tuple = cell_info[2]
            mcc_decimal = first_tuple[0][2]  # Retrieve the decimal MCC (e.g., 1120)
            #mcc_hex = "{:x}".format(mcc_decimal).upper()  # Convert to hexadecimal (e.g., '460')

            lbs_data = "$LBS,{},{},{},{},{},0*69;".format(
                mcc_decimal,
                first_tuple[0][3],
                first_tuple[0][5],
                first_tuple[0][1],
                first_tuple[0][7]
            )
            return lbs_data     

    def put_lbs(self):
            while True:
                lbs_data = self.read()
                if lbs_data is None:
                    utime.sleep(2)
                    continue

                for _ in range(3):
                    with CurrentApp().qth_client:
                        if CurrentApp().qth_client.sendLbs(lbs_data):
                            break
                else:
                    logger.debug("send lbs data to qth server fail, next report will be after 2 seconds")
                    utime.sleep(2)
                    continue

                logger.debug("send LBS data to qth server success")
                break            

GNSS定位服务

管理GNSS模块的初始化和状态控制

与应用程序集成 (init_app)

设置定位更新间隔 (update_interval)

class GnssService(object):

    def __init__(self, app=None):
        self.interval = 300
        self.__gnss = quecgnss

        if app is not None:
            self.init_app(app)

    def __str__(self):
        return '{}'.format(type(self).__name__)
    def init_app(self, app):
        self.event = app.event
        self.gnss_sleep_event = app.gnss_sleep_event
        self.interval = app.config["SLEEP_INTERVAL_SECONDS"]
        app.register('gnss_service', self)
    def load(self):
        logger.info('loading {} extension, init quecgnss will take some seconds'.format(self))
        result = self.init()
        logger.info('{} init gnss res: {}'.format(self, result))
        if result:
            Thread(target=self.start_update).start()

    def init(self):
        if self.__gnss.init() != 0:
            logger.warn('{} gnss init FAILED'.format(self))
            return False
        return True
    def status(self):
        # 0	int	GNSS模块处于关闭状态
        # 1	int	GNSS模块固件升级中
        # 2	int GNSS模块定位中,这种模式下即可开始读取GNSS定位数据,定位数据是否有效需要用户获取到定位数据后,解析对应语句来判断,比如判断GNRMC语句的status是 A 还是 V,A 表示定位有效,V表示定位无效。
        return self.__gnss.get_state()

    def enable(self, flag=True):
        return self.__gnss.gnssEnable(bool(flag)) == 0
    def read(self, size=4096):
        raw = self.__gnss.read(size)
        if raw != -1:
            size, data = raw
            # KHK
            #logger.debug('gnss read raw {} bytes data:\n{}'.format(size, data))
            return NmeaDict.load(data)
    def check_gnss_signal(self, nmea_dict):

        snr_threshold = 15        
        min_sats = 3
        has_3d_fix = False
        if "$GNGSA" in nmea_dict:
            for line in nmea_dict["$GNGSA"]:
                parts = line.split(",")
                if len(parts) > 2 and (parts[2] == "3" or parts[2] == "2"):
                    has_3d_fix = True
                    break
        if not has_3d_fix:
            return False

        snrs = []

        def extract_snrs(lines):
            for line in lines:
                parts = line.split(",")
                i = 4
                while i + 3 < len(parts):
                    snr_str = parts[i + 3]
                    if snr_str.isdigit():
                        snrs.append(int(snr_str))
                    i += 4
        if "$GPGSV" in nmea_dict:
            extract_snrs(nmea_dict["$GPGSV"])

        if "$GBGSV" in nmea_dict:
            extract_snrs(nmea_dict["$GBGSV"])

        if "$GAGSV" in nmea_dict:
            extract_snrs(nmea_dict["$GAGSV"])

        # count satelites with SNR > 15
        count = 0
        for snr in snrs:
            if snr > snr_threshold:
                count += 1
                if count >= min_sats:
                    return True
        return False

    def update_interval(self,interval):  
        ...

传感器数据采集

SensorService.py的SensorService 类是一个用于管理传感器数据采集和更新的服务类,主要负责初始化 I2C 通道和多个传感器(SHTC3、LPS22HB 和 TCS34725),并持续从这些传感器中读取温度、湿度、气压和 RGB 颜色值。它还负责将这些数据发送到指定的QTH 客户端。

class SensorService(object):

    def __init__(self, app=None):
        # i2c channel 0 
        self.i2c_channel0 = I2C(I2C.I2C1, I2C.STANDARD_MODE)
        # SHTC3
        self.shtc3 = Shtc3(self.i2c_channel0, SHTC3_SLAVE_ADDR)
        self.shtc3.init()
        # LPS22HB
        self.lps22hb = Lps22hb(self.i2c_channel0, LPS22HB_SLAVE_ADDRESS)
        self.lps22hb.init()
        # TCS34725
        self.tcs34725 = Tcs34725(self.i2c_channel0, TCS34725_SLAVE_ADDR)
        self.tcs34725.init()

        if app is not None:
            self.init_app(app)

    def __str__(self):
        return '{}'.format(type(self).__name__)

    def init_app(self, app):
        app.register('sensor_service', self)

    def load(self):
        logger.info('loading {} extension, init sensors will take some seconds'.format(self))
        Thread(target=self.start_update).start()            
    def get_temp1_and_humi(self):
        return self.shtc3.getTempAndHumi()

    def get_press_and_temp2(self):
        return self.lps22hb.getTempAndPressure()
    def get_rgb888(self):
            rgb888 = self.tcs34725.getRGBValue()
            logger.debug("R: {}, G: {}, B: {}".format((rgb888 >> 16) & 0xFF, (rgb888 >> 8) & 0xFF, rgb888 & 0xFF))

            r = (rgb888 >> 16) & 0xFF
            g = (rgb888 >> 8) & 0xFF
            b = rgb888 & 0xFF
            return r, g, b   
    def start_update(self):
        prev_temp1 = None
        prev_humi = None
        prev_press = None
        prev_temp2 = None
        prev_rgb888 = None


        while True:
            data = {}
            try:
                temp1, humi = self.shtc3.getTempAndHumi()
                logger.debug("temp1: {:0.2f}, humi: {:0.2f}".format(temp1, humi))

                if prev_temp1 is None or abs(prev_temp1 - temp1) > 1:
                    data.update({3: round(temp1, 2)})
                    prev_temp1 = temp1

                if prev_humi is None or abs(prev_humi - humi) > 1:
                    data.update({4: round(humi, 2)})
                    prev_humi = humi            
            except Exception as e:
                logger.error("getTempAndHumi error:{}".format(e))

            utime.sleep_ms(100)

            try:
                press, temp2 = self.lps22hb.getTempAndPressure()
                logger.debug("press: {:0.2f}, temp2: {:0.2f}".format(press, temp2))

                if prev_temp2 is None or abs(prev_temp2 - temp2) > 1:
                    data.update({5: round(temp2, 2)})
                    prev_temp2 = temp2

                if prev_press is None or abs(prev_press - press) > 1:
                    data.update({6: round(press, 2)})
                    prev_press = press

            except Exception as e:
                logger.error("getTempAndPressure error:{}".format(e))

            utime.sleep(1)            


QTH平台客户端

QthClient 类是一个用于与 QTH(Quantum Technology Hub)平台进行通信的客户端类。它负责初始化、启动和管理与 QTH 平台的连接,并处理来自平台的各种事件和回调。

logger = getLogger(__name__)
class QthClient(object):

    def __init__(self, app=None):
        self.opt_lock = Lock()
        if app:
            self.init_app(app)
 def __enter__(self):
        self.opt_lock.acquire()
        return self

    def __exit__(self, *args, **kwargs):
        self.opt_lock.release()

    def init_app(self, app):
        app.register("qth_client", self)
        Qth.init()               
        Qth.setProductInfo(app.config["QTH_PRODUCT_KEY"], app.config["QTH_PRODUCT_SECRET"])
        Qth.setServer(app.config["QTH_SERVER"])
        Qth.setEventCb(
            {
                "devEvent": self.eventCallback, 
                "recvTrans": self.recvTransCallback, 
                "recvTsl": self.recvTslCallback, 
                "readTsl": self.readTslCallback, 
                "readTslServer": self.recvTslServerCallback,
                "ota": {
                    "otaPlan":self.otaPlanCallback,
                    "fotaResult":self.fotaResultCallback
                }
            }
        )
def load(self):
        self.start()

    def start(self):
        Qth.start()
        while not self.isStatusOk():
            utime.sleep(3)

    def stop(self):
        Qth.stop()

    def sendTsl(self, mode, value):
        return Qth.sendTsl(mode, value)

    def isStatusOk(self):
        return Qth.state()

    def sendLbs(self, lbs_data):
        return Qth.sendOutsideLocation(lbs_data)

    def sendGnss(self, nmea_data):
        return Qth.sendOutsideLocation(nmea_data)

    def eventCallback(self, event, result):
        logger.info("dev event:{} result:{}".format(event, result))
        if(2== event and 0 == result):
            Qth.otaRequest()
def recvTransCallback(self, value):
        ret =Qth.sendTrans(1, value)
        logger.info("recvTrans value:{} ret:{}".format(value, ret))

    def recvTslCallback(self, value):
        logger.info("recvTsl:{}".format(value))
        for cmdId, val in value.items():
            logger.info("recvTsl {}:{}".format(cmdId, val))

            if cmdId == 8:
                CurrentApp().gnss_service.update_interval(val)
                CurrentApp().lbs_service.update_interval(val)

    def readTslCallback(self, ids, pkgId):
        logger.info("readTsl ids:{} pkgId:{}".format(ids, pkgId))
        value=dict()
         temp1, humi =CurrentApp().sensor_service.get_temp1_and_humi()
        press, temp2 = CurrentApp().sensor_service.get_press_and_temp2()
        r,g,b = CurrentApp().sensor_service.get_rgb888()


        for id in ids:
            if 3 == id:
                value[3]=temp1
            elif 4 == id:
                value[4]=humi
            elif 5 == id:
                value[5]=temp2
            elif 6 == id:
                value[6]=press
            elif 7 == id:
                value[7]={1:r, 2:g, 3:b}
        Qth.ackTsl(1, value, pkgId)


    def recvTslServerCallback(self, serverId, value, pkgId):
        logger.info("recvTslServer serverId:{} value:{} pkgId:{}".format(serverId, value, pkgId))
        Qth.ackTslServer(1, serverId, value, pkgId)
def otaPlanCallback(self, plans):
        logger.info("otaPlan:{}".format(plans))
        Qth.otaAction(1)

    def fotaResultCallback(self, comp_no, result):
        logger.info("fotaResult comp_no:{} result:{}".format(comp_no, result))

    def sotaInfoCallback(self, comp_no, version, url, md5, crc):
        logger.info("sotaInfo comp_no:{} version:{} url:{} md5:{} crc:{}".format(comp_no, version, url, md5, crc))
        # 当使用url下载固件完成,且MCU更新完毕后,需要获取MCU最新的版本信息,并通过setMcuVer进行更新
        Qth.setMcuVer("MCU1", "V1.0.0", self.sotaInfoCallback, self.sotaResultCallback)

    def sotaResultCallback(comp_no, result):
        logger.info("sotaResult comp_no:{} result:{}".format(comp_no, result))

沟通无界,服务无限

了解更多关于移远公司、产品和技术支持的信息。