返回主站|会员中心|保存桌面

库卡机器人及配件服务商    

库卡机器人、库卡机器人备品配件

联系方式
  • 联系人:吴经理
  • 电话:18576370666
新闻分类
  • 暂无分类
首页 > 新闻中心 > OPC Client数据采集!
新闻中心
OPC Client数据采集!
发布时间:2025-10-01        浏览次数:3        返回列表

OPC Client 数据采集是工业自动化中实现不同设备间数据交互的重要方式,它可以从 OPC Server 中读取或写入数据。下面我将介绍如何使用 Python 实现一个简单的 OPC Client 数据采集程序。

Python 中常用的 OPC 客户端库有pywin32(用于连接 OPC DA 服务器)和opcua(用于 OPC UA 服务器)。以下是两种常见的实现方式:

1. OPC DA 客户端实现(使用 pywin32)

适用于传统的 OPC DA 协议,需要在 Windows 环境下运行:

OPC DA客户端数据采集程序

V1

创建时间:11:02

2. OPC UA 客户端实现(使用 opcua 库)

OPC UA 是新一代的 OPC 标准,跨平台支持更好:

OPC UA客户端数据采集程序

import pythoncom

import win32com.client

import time


class OPCDAClient:

    def __init__(self, server_name, node_names):

        """

        初始化OPC DA客户端

        :param server_name: OPC服务器名称

        :param node_names: 要监控的节点名称列表

        """

        self.server_name = server_name

        self.node_names = node_names

        self.opc_server = None

        self.opc_groups = None

        self.opc_group = None

        self.items = None

        

    def connect(self):

        """连接到OPC服务器"""

        try:

            # 初始化COM

            pythoncom.CoInitialize()

            

            # 连接到OPC服务器

            self.opc_server = win32com.client.Dispatch(self.server_name)

            self.opc_groups = self.opc_server.OPCGroups

            

            # 创建一个组

            self.opc_group = self.opc_groups.Add("Group1")

            self.opc_group.IsActive = True

            self.opc_group.UpdateRate = 1000  # 刷新速率(ms)

            

            # 添加要监控的项

            self.items = self.opc_group.OPCItems.AddItems(

                len(self.node_names), 

                self.node_names, 

                list(range(len(self.node_names)))

            )

            

            print(f"成功连接到OPC服务器: {self.server_name}")

            return True

        except Exception as e:

            print(f"连接OPC服务器失败: {str(e)}")

            return False

    

    def read_data(self):

        """读取OPC服务器数据"""

        try:

            # 读取数据

            errors, values, qualities, timestamps = self.opc_group.SyncRead(

                0,  # 读取模式: 0=读取缓存

                len(self.node_names), 

                [item.ServerHandle for item in self.items]

            )

            

            # 整理结果

            result = {}

            for i, node_name in enumerate(self.node_names):

                result[node_name] = {

                    'value': values[i],

                    'quality': qualities[i],

                    'timestamp': timestamps[i]

                }

            

            return result

        except Exception as e:

            print(f"读取数据失败: {str(e)}")

            return None

    

    def write_data(self, node_name, value):

        """写入数据到OPC服务器"""

        try:

            # 找到节点索引

            index = self.node_names.index(node_name)

            item = self.items[index]

            

            # 写入数据

            error = self.opc_group.SyncWrite(

                1,  # 写入模式

                [item.ServerHandle],

                [value]

            )

            

            return error[0] == 0  # 如果返回0则表示成功

        except Exception as e:

            print(f"写入数据失败: {str(e)}")

            return False

    

    def disconnect(self):

        """断开与OPC服务器的连接"""

        if self.opc_server:

            self.opc_groups.RemoveAll()

            self.opc_server.Disconnect()

            print("已断开与OPC服务器的连接")

        

        # 释放COM资源

        pythoncom.CoUninitialize()


if __name__ == "__main__":

    # OPC服务器名称(例如:"Kepware.KEPServerEX.V6")

    opc_server_name = "Kepware.KEPServerEX.V6"

    

    # 要监控的节点列表

    nodes = [

        "Channel1.Device1.Tag1",

        "Channel1.Device1.Tag2",

        "Channel1.Device1.Tag3"

    ]

    

    # 创建客户端实例

    client = OPCDAClient(opc_server_name, nodes)

    

    # 连接服务器

    if client.connect():

        try:

            # 循环读取数据

            while True:

                data = client.read_data()

                if data:

                    print("\n--- 数据采集结果 ---")

                    for node, info in data.items():

                        print(f"{node}: {info['value']} (质量: {info['quality']})")

                

                # 每2秒读取一次

                time.sleep(2)

        except KeyboardInterrupt:

            print("\n用户中断程序")

        finally:

            # 断开连接

            client.disconnect()

使用说明

  1. 安装必要的库

    • 对于 OPC DA 客户端:pip install pywin32

    • 对于 OPC UA 客户端:pip install opcua

  2. 配置参数

    • 根据实际的 OPC 服务器地址、节点名称或 ID 进行修改

    • OPC DA 的服务器名称通常如 "Kepware.KEPServerEX.V6"

    • OPC UA 的服务器 URL 通常如 "opc.tcp://localhost:4840/"

  3. 运行程序

    • 程序会周期性地从 OPC 服务器读取数据并打印

    • 按 Ctrl+C 可以停止程序

注意事项

  1. OPC DA 通常需要在 Windows 系统上运行,而 OPC UA 可以跨平台

  2. 确保防火墙设置允许客户端与 OPC 服务器通信

  3. 不同的 OPC 服务器可能有不同的节点命名规则,需要参考对应服务器的文档

  4. 对于大规模数据采集,建议使用异步读取方式提高效率

  5. 实际应用中应添加更完善的错误处理和重连机制

选择哪种实现方式取决于你的 OPC 服务器类型和实际应用场景。OPC UA 是更现代、更灵活的选择,推荐在新项目中优先使用。

收缩
  • QQ咨询

  • 电话咨询

  • 18576370666
  • 添加微信客服