examples/opcua/opcuaviewer#

(You can also check this code in the repository)

Download this example

# Copyright (C) 2021 The Qt Company Ltd.
# Copyright (C) 2018 Unified Automation GmbH
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause

"""PySide6 OpcUa Viewer Example"""

import sys
from mainwindow import MainWindow
from PySide6.QtCore import QCoreApplication
from PySide6.QtWidgets import QApplication


if __name__ == '__main__':
    app = QApplication(sys.argv)
    QCoreApplication.setApplicationName('Qt for Python OpcUa Viewer')
    initial_url = sys.argv[1] if len(sys.argv) > 1 else 'opc.tcp://localhost:48010'
    main_win = MainWindow(initial_url)
    main_win.show()
    sys.exit(app.exec())
# Copyright (C) 2021 The Qt Company Ltd.
# Copyright (C) 2018 Unified Automation GmbH
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause

from ui_certificatedialog import Ui_CertificateDialog

from PySide6.QtCore import QFile
from PySide6.QtNetwork import QSslCertificate
from PySide6.QtWidgets import QDialog, QPushButton


class CertificateDialog(QDialog):

    def __init__(self, parent=None):
        super(CertificateDialog, self).__init__(parent)
        self._cert = QSslCertificate()
        self._trustListDirectory = ''
        self._ui = Ui_CertificateDialog()
        self._ui.setupUi(self)
        self._ui.btnTrust.clicked.connect(self.saveCertificate)

    # Returns 0 if the connect should be aborted, 1 if it should be resumed.
    def showCertificate(self, message, der, trustListDirectory):
        certs = QSslCertificate.fromData(der, QSsl.Der)
        self._trustListDirectory = trustListDirectory

        # If it is an untrusted self-signed certificate we can allow to
        # trust it.
        if len(certs) == 1 and certs[0].isSelfSigned():
            self._cert = certs[0]
            self._ui.btnTrust.setEnabled(True)
        else:
            self._ui.btnTrust.setEnabled(False)

        for cert in certs:
            self._ui.certificate.appendPlainText(cert.toText())

        self._ui.message.setText(message)
        self._ui.certificate.moveCursor(QTextCursor.Start)
        self._ui.certificate.ensureCursorVisible()

        return self.exec()

    def saveCertificate(self):
        digest = self._cert.digest()
        path = "{}/{}.der".format(self._trustListDirectory, digest.toHex())

        file = QFile(path)
        if file.open(QIODevice.WriteOnly):
            file.write(m_cert.toDer())
            file.close()
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
 <class>CertificateDialog</class>
 <widget class="QDialog" name="CertificateDialog">
  <property name="geometry">
   <rect>
    <x>0</x>
    <y>0</y>
    <width>720</width>
    <height>423</height>
   </rect>
  </property>
  <property name="windowTitle">
   <string>Dialog</string>
  </property>
  <layout class="QVBoxLayout" name="verticalLayout">
   <item>
    <widget class="QLabel" name="message">
     <property name="text">
      <string>TextLabel</string>
     </property>
     <property name="alignment">
      <set>Qt::AlignLeading|Qt::AlignLeft|Qt::AlignTop</set>
     </property>
     <property name="wordWrap">
      <bool>true</bool>
     </property>
    </widget>
   </item>
   <item>
    <widget class="QPlainTextEdit" name="certificate">
     <property name="readOnly">
      <bool>true</bool>
     </property>
    </widget>
   </item>
   <item>
    <layout class="QHBoxLayout" name="horizontalLayout">
     <item>
      <spacer name="horizontalSpacer">
       <property name="orientation">
        <enum>Qt::Horizontal</enum>
       </property>
       <property name="sizeHint" stdset="0">
        <size>
         <width>40</width>
         <height>20</height>
        </size>
       </property>
      </spacer>
     </item>
     <item>
      <widget class="QPushButton" name="btnIgnore">
       <property name="text">
        <string>Ignore Error</string>
       </property>
      </widget>
     </item>
     <item>
      <widget class="QPushButton" name="btnTrust">
       <property name="text">
        <string>Trust Certificate</string>
       </property>
      </widget>
     </item>
     <item>
      <widget class="QPushButton" name="btnCancel">
       <property name="text">
        <string>Cancel</string>
       </property>
       <property name="default">
        <bool>true</bool>
       </property>
      </widget>
     </item>
    </layout>
   </item>
  </layout>
 </widget>
 <resources/>
 <connections>
  <connection>
   <sender>btnIgnore</sender>
   <signal>clicked()</signal>
   <receiver>CertificateDialog</receiver>
   <slot>accept()</slot>
   <hints>
    <hint type="sourcelabel">
     <x>475</x>
     <y>402</y>
    </hint>
    <hint type="destinationlabel">
     <x>374</x>
     <y>397</y>
    </hint>
   </hints>
  </connection>
  <connection>
   <sender>btnTrust</sender>
   <signal>clicked()</signal>
   <receiver>CertificateDialog</receiver>
   <slot>accept()</slot>
   <hints>
    <hint type="sourcelabel">
     <x>563</x>
     <y>404</y>
    </hint>
    <hint type="destinationlabel">
     <x>287</x>
     <y>395</y>
    </hint>
   </hints>
  </connection>
  <connection>
   <sender>btnCancel</sender>
   <signal>clicked()</signal>
   <receiver>CertificateDialog</receiver>
   <slot>reject()</slot>
   <hints>
    <hint type="sourcelabel">
     <x>670</x>
     <y>406</y>
    </hint>
    <hint type="destinationlabel">
     <x>717</x>
     <y>395</y>
    </hint>
   </hints>
  </connection>
 </connections>
</ui>
# Copyright (C) 2021 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause

from textwrap import dedent

from PySide6.QtCore import QCoreApplication, Qt
from PySide6.QtWidgets import (QDialog, QDialogButtonBox, QFrame, QLabel,
                               QVBoxLayout)


_DOC_URL = 'https://doc.qt.io/QtOPCUA/index.html'

_HELP_FORMAT = dedent("""
    <html></head><body><p>
    The {} can be used to browse
    servers from the <a href="{}">QtOpcUa</A> module, for example the
    waterpump simulation server, which runs on port 43344.</p></body></html>
    """)


class HelpDialog(QDialog):

    def __init__(self, parent=None):
        global _HELP_FORMAT, _DOC_URL

        super(HelpDialog, self).__init__(parent)
        name = QCoreApplication.applicationName()
        self.setWindowTitle('{} Help'.format(name))
        vbox_layout = QVBoxLayout(self)
        help_text = _HELP_FORMAT.format(name, _DOC_URL)
        display = QLabel(help_text, self, wordWrap=True,
                         openExternalLinks=True,
                         textInteractionFlags=Qt.TextBrowserInteraction)
        vbox_layout.addWidget(display)
        vbox_layout.addStretch()
        button_box = QDialogButtonBox(QDialogButtonBox.Close)
        vbox_layout.addWidget(button_box)
        button_box.rejected.connect(self.reject)
# Copyright (C) 2021 The Qt Company Ltd.
# Copyright (C) 2018 Unified Automation GmbH
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause

import os
from textwrap import dedent

from ui_mainwindow import Ui_MainWindow
from opcuamodel import OpcUaModel
from certificatedialog import CertificateDialog
from helpdialog import HelpDialog

from PySide6.QtCore import (QCoreApplication, QDir, qDebug, Qt, QtMsgType,
                            Slot, QUrl, qInstallMessageHandler, qWarning)
from PySide6.QtGui import QKeySequence
from PySide6.QtWidgets import (QApplication, QAbstractItemView,
                               QComboBox, QGridLayout, QHeaderView,
                               QHBoxLayout, QLabel, QLineEdit, QMainWindow,
                               QMenu, QMenuBar, QMessageBox, QPlainTextEdit,
                               QPushButton, QSizePolicy, QTreeView,
                               QVBoxLayout, QWidget)
from PySide6.QtOpcUa import (QOpcUa, QOpcUaApplicationDescription,
                             QOpcUaAuthenticationInformation, QOpcUaProvider,
                             QOpcUaErrorState, QOpcUaPkiConfiguration,
                             QOpcUaUserTokenPolicy)

_main_window = None
_MESSAGE_TYPES = {QtMsgType.QtWarningMsg: "Warning",
                  QtMsgType.QtCriticalMsg: "Critical",
                  QtMsgType.QtFatalMsg: "Fatal", QtMsgType.QtInfoMsg: "Info",
                  QtMsgType.QtDebugMsg: "Debug"}


def _messageHandler(msg_type, context, message):
    global _main_window

    if _main_window:
        _main_window.log('{}: {}'.format(_MESSAGE_TYPES[msg_type], message))


class MainWindow(QMainWindow):

    def __init__(self, initial_url, parent=None):
        global _main_window

        super(MainWindow, self).__init__(parent)

        _main_window = self

        self._end_point_list = []
        self._opcua_client = None
        self._client_connected = False
        self._identity = None
        self._pki_config = QOpcUaPkiConfiguration()
        self._endpoint = None
        self._old_messagehandler = qInstallMessageHandler(_messageHandler)

        self._ui = Ui_MainWindow()
        self._ui.setupUi(self)
        self.setWindowTitle(QCoreApplication.applicationName())
        self._ui.host.setText(initial_url)
        self._ui.quitAction.setShortcut(QKeySequence(Qt.CTRL | Qt.Key_Q))
        self._ui.quitAction.triggered.connect(self.close)
        self._ui.helpAction.setShortcut(QKeySequence(QKeySequence.HelpContents))
        self._ui.helpAction.triggered.connect(self.showHelpDialog)
        self._ui.aboutAction.triggered.connect(qApp.aboutQt)

        self._opcua_model = OpcUaModel(self._ui.centralwidget)
        self.mOpcUaProvider = QOpcUaProvider(self._ui.centralwidget)

        self.updateUiState()

        self._ui.opcUaPlugin.addItems(self.mOpcUaProvider.availableBackends())
        self._ui.treeView.setModel(self._opcua_model)

        if not self._ui.opcUaPlugin.count():
            self._ui.opcUaPlugin.setDisabled(True)
            self._ui.connectButton.setDisabled(True)
            error = "The list of available OPCUA plugins is empty. No connection possible."
            QMessageBox.critical(self, "No OPCUA plugins available", error)

        self._ui.host.returnPressed.connect(self.animateFindServersClick)
        self._ui.findServersButton.clicked.connect(self.findServers)
        self._ui.getEndpointsButton.clicked.connect(self.getEndpoints)
        self._ui.connectButton.clicked.connect(self.connectToServer)

        self.setupPkiConfiguration()

        self._identity = self._pki_config.applicationIdentity()

    def __del__(self):
        qInstallMessageHandler(self._old_messagehandler)

    @Slot()
    def animateFindServersClick(self):
        self._ui.findServersButton.animateClick()

    def setupPkiConfiguration(self):
        pkidir = os.path.join(os.path.dirname(__file__), 'pki')
        self._pki_config.setClientCertificateFile(pkidir + "/own/certs/opcuaviewer.der")
        self._pki_config.setPrivateKeyFile(pkidir + "/own/private/opcuaviewer.pem")
        self._pki_config.setTrustListDirectory(pkidir + "/trusted/certs")
        self._pki_config.setRevocationListDirectory(pkidir + "/trusted/crl")
        self._pki_config.setIssuerListDirectory(pkidir + "/issuers/certs")
        self._pki_config.setIssuerRevocationListDirectory(pkidir + "/issuers/crl")

        # create the folders if they don't exist yet
        self.createPkiFolders()

    def createClient(self):
        if not self._opcua_client:
            self._opcua_client = self.mOpcUaProvider.createClient(self._ui.opcUaPlugin.currentText())
        if not self._opcua_client:
            qWarning("Connecting to the given server failed. See the log for details.")
            QMessageBox.critical(self,  "Failed to connect to server", message)
            return

        self._opcua_client.connectError.connect(self.showErrorDialog)
        self._opcua_client.setApplicationIdentity(self._identity)
        self._opcua_client.setPkiConfiguration(self._pki_config)

        if QOpcUaUserTokenPolicy.TokenType.Certificate in self._opcua_client.supportedUserTokenTypes():
            authInfo = QOpcUaAuthenticationInformation()
            authInfo.setCertificateAuthentication()
            self._opcua_client.setAuthenticationInformation(authInfo)

        self._opcua_client.connected.connect(self.clientConnected)
        self._opcua_client.disconnected.connect(self.clientDisconnected)
        self._opcua_client.errorChanged.connect(self.clientError)
        self._opcua_client.stateChanged.connect(self.clientState)
        self._opcua_client.endpointsRequestFinished.connect(self.getEndpointsComplete)
        self._opcua_client.findServersFinished.connect(self.findServersComplete)

    @Slot()
    def findServers(self):
        locale_ids = []
        server_uris = []
        url = QUrl(self._ui.host.text())

        self.updateUiState()

        self.createClient()
        # set default port if missing
        if url.port() == -1:
            url.setPort(4840)

        if self._opcua_client:
            self._opcua_client.findServers(url, locale_ids, server_uris)
            qDebug("Discovering servers on {}".format(url.toString()))

    @Slot(list, QOpcUa.UaStatusCode)
    def findServersComplete(self, servers, status_code):
        server = QOpcUaApplicationDescription()
        if QOpcUa.isSuccessStatus(status_code):
            self._ui.servers.clear()
            for server in servers:
                self._ui.servers.addItems(server.discoveryUrls())
        self.updateUiState()

    @Slot()
    def getEndpoints(self):
        self._ui.endpoints.clear()
        self.updateUiState()
        if self._ui.servers.currentIndex() >= 0:
            self.createClient()
            self._opcua_client.requestEndpoints(self._ui.servers.currentText())

    @Slot(list, QOpcUa.UaStatusCode)
    def getEndpointsComplete(self, endpoints, status_code):
        index = 0
        modes = ["Invalid", "None", "Sign", "SignAndEncrypt"]
        if QOpcUa.isSuccessStatus(status_code):
            self._end_point_list = endpoints
            for endpoint in endpoints:
                if endpoint.securityMode() > len(modes):
                    qWarning("Invalid security mode")
                    continue

                endpointName = "{} ({})".format(endpoint.securityPolicy(),
                                                modes[endpoint.securityMode()])
                self._ui.endpoints.addItem(endpointName, index)
                index = index + 1

        self.updateUiState()

    @Slot()
    def connectToServer(self):
        if self._client_connected:
            self._opcua_client.disconnectFromEndpoint()
            return

        if self._ui.endpoints.currentIndex() >= 0:
            self._endpoint = self._end_point_list[self._ui.endpoints.currentIndex()]
            self.createClient()
            self._opcua_client.connectToEndpoint(self._endpoint)

    @Slot()
    def clientConnected(self):
        self._client_connected = True
        self.updateUiState()

        self._opcua_client.namespaceArrayUpdated.connect(self.namespacesArrayUpdated)
        self._opcua_client.updateNamespaceArray()

    @Slot()
    def clientDisconnected(self):
        self._client_connected = False
        self._opcua_client = None
        self._opcua_model.setOpcUaClient(None)
        self.updateUiState()

    @Slot(list)
    def namespacesArrayUpdated(self, namespace_array):
        if not namespace_array:
            qWarning("Failed to retrieve the namespaces array")
            return

        self._opcua_client.namespaceArrayUpdated.disconnect(self.namespacesArrayUpdated)
        self._opcua_model.setOpcUaClient(self._opcua_client)
        self._ui.treeView.header().setSectionResizeMode(1, QHeaderView.Interactive)

    def clientError(self, error):
        qWarning("Client error changed {}".format(error))

    def clientState(self, state):
        qDebug("Client state changed {}".format(state))

    def updateUiState(self):
        # allow changing the backend only if it was not already created
        self._ui.opcUaPlugin.setEnabled(not self._opcua_client)
        text = "Disconnect" if self._client_connected else "Connect"
        self._ui.connectButton.setText(text)

        if self._client_connected:
            self._ui.host.setEnabled(False)
            self._ui.servers.setEnabled(False)
            self._ui.endpoints.setEnabled(False)
            self._ui.findServersButton.setEnabled(False)
            self._ui.getEndpointsButton.setEnabled(False)
            self._ui.connectButton.setEnabled(True)
        else:
            self._ui.host.setEnabled(True)
            self._ui.servers.setEnabled(self._ui.servers.count() > 0)
            self._ui.endpoints.setEnabled(self._ui.endpoints.count() > 0)
            self._ui.findServersButton.setDisabled(len(self._ui.host.text()) == 0)
            self._ui.getEndpointsButton.setEnabled(self._ui.servers.currentIndex() != -1)
            self._ui.connectButton.setEnabled(self._ui.endpoints.currentIndex() != -1)

        if not self._opcua_client:
            self._ui.servers.setEnabled(False)
            self._ui.endpoints.setEnabled(False)
            self._ui.getEndpointsButton.setEnabled(False)
            self._ui.connectButton.setEnabled(False)

    def log(self, text):
        self._ui.log.appendPlainText(text)

    def createPkiPath(self, path):
        msg = "Creating PKI path '{}': {}"
        dir = QDir()
        ret = dir.mkpath(path)
        if ret:
            qDebug(msg.format(path, "SUCCESS."))
        else:
            qWarning(msg.format(path, "FAILED."))
        return ret

    def createPkiFolders(self):
        result = self.createPkiPath(self._pki_config.trustListDirectory())
        if not result:
            return result

        result = self.createPkiPath(self._pki_config.revocationListDirectory())
        if not result:
            return result

        result = self.createPkiPath(self._pki_config.issuerListDirectory())
        if not result:
            return result

        result = self.createPkiPath(self._pki_config.issuerRevocationListDirectory())
        if not result:
            return result

        return result

    @Slot(QOpcUaErrorState)
    def showErrorDialog(self, error_state):
        result = 0
        status_code = QOpcUa.statusToString(error_state.errorCode())
        if error_state.isClientSideError():
            msg = "The client reported: "
        else:
            msg = "The server reported: "

        step = error_state.connectionStep()
        if step == QOpcUaErrorState.ConnectionStep.CertificateValidation:
            msg += dedent(
                """
                Server certificate validation failed with error {:04X} ({}).
                Click 'Abort' to abort the connect, or 'Ignore' to continue connecting.
                """).format(error_state.errorCode(), status_code)
            dialog = CertificateDialog(self)
            result = dialog.showCertificate(msg, self._endpoint.serverCertificate(),
                                            self._pki_config.trustListDirectory())
            error_state.setIgnoreError(result == 1)
        elif step == QOpcUaErrorState.ConnectionStep.OpenSecureChannel:
            msg += "OpenSecureChannel failed with error {:04X} ({}).".format(
                error_state.errorCode(), status_code)
            QMessageBox.warning(self, "Connection Error", msg)
        elif step == QOpcUaErrorState.ConnectionStep.CreateSession:
            msg += "CreateSession failed with error {:04X} ({}).".format(
                error_state.errorCode(), status_code)
            QMessageBox.warning(self, "Connection Error", msg)
        elif step == QOpcUaErrorState.ConnectionStep.ActivateSession:
            msg += "ActivateSession failed with error {:04X} ({}).".format(
                error_state.errorCode(), status_code)
            QMessageBox.warning(self, "Connection Error", msg)

    @Slot()
    def showHelpDialog(self):
        dialog = HelpDialog(self)
        dialog.exec()
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copy of the C++ example's UI file with the help action added. -->
<ui version="4.0">
 <class>MainWindow</class>
 <widget class="QMainWindow" name="MainWindow">
  <property name="geometry">
   <rect>
    <x>0</x>
    <y>0</y>
    <width>800</width>
    <height>600</height>
   </rect>
  </property>
  <property name="minimumSize">
   <size>
    <width>800</width>
    <height>0</height>
   </size>
  </property>
  <property name="windowTitle">
   <string>MainWindow</string>
  </property>
  <widget class="QWidget" name="centralwidget">
   <layout class="QGridLayout" name="gridLayout">
    <item row="1" column="0">
     <widget class="QTreeView" name="treeView">
      <property name="alternatingRowColors">
       <bool>true</bool>
      </property>
      <property name="selectionBehavior">
       <enum>QAbstractItemView::SelectItems</enum>
      </property>
     </widget>
    </item>
    <item row="0" column="0">
     <layout class="QVBoxLayout" name="verticalLayout">
      <item>
       <layout class="QGridLayout" name="gridLayout_2">
        <item row="0" column="0">
         <widget class="QLabel" name="label">
          <property name="text">
           <string>Select OPC UA Backend:</string>
          </property>
         </widget>
        </item>
        <item row="1" column="2">
         <widget class="QPushButton" name="findServersButton">
          <property name="text">
           <string>Find Servers</string>
          </property>
         </widget>
        </item>
        <item row="2" column="2">
         <widget class="QPushButton" name="getEndpointsButton">
          <property name="text">
           <string>Get Endpoints</string>
          </property>
         </widget>
        </item>
        <item row="2" column="1">
         <widget class="QComboBox" name="servers"/>
        </item>
        <item row="2" column="0">
         <widget class="QLabel" name="label_3">
          <property name="text">
           <string>Select OPC UA Server:</string>
          </property>
         </widget>
        </item>
        <item row="1" column="1">
         <widget class="QLineEdit" name="host">
          <property name="clearButtonEnabled">
           <bool>true</bool>
          </property>
         </widget>
        </item>
        <item row="0" column="1">
         <widget class="QComboBox" name="opcUaPlugin"/>
        </item>
        <item row="1" column="0">
         <widget class="QLabel" name="label_2">
          <property name="text">
           <string>Select host to discover:</string>
          </property>
         </widget>
        </item>
        <item row="3" column="0">
         <widget class="QLabel" name="label_5">
          <property name="text">
           <string>Select OPC UA Endpoint:</string>
          </property>
         </widget>
        </item>
        <item row="3" column="1">
         <widget class="QComboBox" name="endpoints">
          <property name="sizePolicy">
           <sizepolicy hsizetype="Expanding" vsizetype="Preferred">
            <horstretch>0</horstretch>
            <verstretch>0</verstretch>
           </sizepolicy>
          </property>
         </widget>
        </item>
        <item row="3" column="2">
         <widget class="QPushButton" name="connectButton">
          <property name="text">
           <string>Connect</string>
          </property>
         </widget>
        </item>
       </layout>
      </item>
     </layout>
    </item>
    <item row="2" column="0">
     <widget class="QLabel" name="label_4">
      <property name="text">
       <string>Log:</string>
      </property>
     </widget>
    </item>
    <item row="3" column="0">
     <widget class="QPlainTextEdit" name="log">
      <property name="lineWrapMode">
       <enum>QPlainTextEdit::NoWrap</enum>
      </property>
      <property name="readOnly">
       <bool>true</bool>
      </property>
     </widget>
    </item>
   </layout>
  </widget>
  <widget class="QMenuBar" name="menubar">
   <property name="geometry">
    <rect>
     <x>0</x>
     <y>0</y>
     <width>800</width>
     <height>30</height>
    </rect>
   </property>
   <widget class="QMenu" name="menuFile">
    <property name="title">
     <string>File</string>
    </property>
    <addaction name="quitAction"/>
   </widget>
   <widget class="QMenu" name="menuHelp">
    <property name="title">
     <string>Help</string>
    </property>
    <addaction name="helpAction"/>
    <addaction name="aboutAction"/>
   </widget>
   <addaction name="menuFile"/>
   <addaction name="menuHelp"/>
  </widget>
  <widget class="QStatusBar" name="statusbar"/>
  <action name="quitAction">
   <property name="text">
    <string>Quit</string>
   </property>
  </action>
  <action name="actionHelp">
   <property name="text">
    <string>Help</string>
   </property>
  </action>
  <action name="actionAbout_Qt">
   <property name="text">
    <string>About Qt</string>
   </property>
  </action>
  <action name="helpAction">
   <property name="text">
    <string>Help</string>
   </property>
  </action>
  <action name="aboutAction">
   <property name="text">
    <string>About Qt</string>
   </property>
  </action>
 </widget>
 <resources/>
 <connections/>
</ui>
# Copyright (C) 2021 The Qt Company Ltd.
# Copyright (C) 2018 Unified Automation GmbH
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause

from treeitem import TreeItem

from PySide6.QtGui import QIcon
from PySide6.QtCore import QAbstractItemModel, QModelIndex, Qt
from PySide6.QtOpcUa import QOpcUaClient, QOpcUaNode


_COLUMN_TITLES = ["BrowseName", "Value", "NodeClass", "DataType", "NodeId",
                  "DisplayName", "Description"]


class OpcUaModel(QAbstractItemModel):
    def __init__(self, parent):
        super(OpcUaModel, self).__init__(parent)
        self._opcua_client = None
        self._root_item = None

    def setOpcUaClient(self, client):
        self.beginResetModel()
        self._opcua_client = client
        if self._opcua_client:
            self._root_item = TreeItem(client.node("ns=0;i=84"), self)
        else:
            self._root_item = None
        self.endResetModel()

    def opcUaClient(self):
        return self._opcua_client

    def data(self, index, role):
        if not index.isValid():
            return None

        item = index.internalPointer()
        if role == Qt.DisplayRole:
            return item.data(index.column())
        if role == Qt.DecorationRole and index.column() == 0:
            return item.icon(index.column())

        return None

    def headerData(self, section, orientation, role):
        if role != Qt.DisplayRole:
            return None

        if orientation == Qt.Vertical:
            return "Row {}".format(section)

        if section < len(_COLUMN_TITLES):
            return _COLUMN_TITLES[section]
        return "Column {}".format(section)

    def index(self, row, column, parent):
        if not self.hasIndex(row, column, parent):
            return QModelIndex()

        item = parent.internalPointer().child(row) if parent.isValid() else self._root_item
        return self.createIndex(row, column, item) if item else QModelIndex()

    def parent(self, index):
        if not index.isValid():
            return QModelIndex()

        child_item = index.internalPointer()
        if child_item == self._root_item:
            return QModelIndex()
        parent_item = child_item.parentItem()
        if not parent_item:
            return QModelIndex()

        return self.createIndex(parent_item.row(), 0, parent_item)

    def rowCount(self, parent):
        if not self._opcua_client or parent.column() > 0:
            return 0

        if not parent.isValid():
            return 1  # only one root item

        parent_item = parent.internalPointer()
        return parent_item.childCount() if parent_item else 0

    def columnCount(self, parent):
        if parent.isValid():
            return parent.internalPointer().columnCount()
        return self._root_item.columnCount() if self._root_item else 0
# Copyright (C) 2021 The Qt Company Ltd.
# Copyright (C) 2018 Unified Automation GmbH
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause

from PySide6.QtGui import QColor, QPixmap
from PySide6.QtCore import Qt, qDebug, QObject, QMetaEnum, qWarning
from PySide6.QtOpcUa import (QOpcUa, QOpcUaArgument, QOpcUaAxisInformation,
                             QOpcUaClient, QOpcUaComplexNumber,
                             QOpcUaDoubleComplexNumber, QOpcUaEUInformation,
                             QOpcUaExtensionObject, QOpcUaLocalizedText,
                             QOpcUaQualifiedName, QOpcUaRange,
                             QOpcUaXValue)

# Columns: 0: NodeId, 1: Value, 2: NodeClass, 3: DataType, 4: BrowseName,
#          5: DisplayName, 6: Description
_numberOfDisplayColumns = 7
_object_pixmap = None
_variable_pixmap = None
_method_pixmap = None
_default_pixmap = None

_DESIRED_ATTRIBUTES = (QOpcUa.NodeAttribute.Value
                       | QOpcUa.NodeAttribute.NodeClass
                       | QOpcUa.NodeAttribute.Description
                       | QOpcUa.NodeAttribute.DataType
                       | QOpcUa.NodeAttribute.BrowseName
                       | QOpcUa.NodeAttribute.DisplayName)

_NODE_CLASSES = {QOpcUa.NodeClass.Undefined: 'Undefined',
                 QOpcUa.NodeClass.Object: 'Object',
                 QOpcUa.NodeClass.Variable: 'Variable',
                 QOpcUa.NodeClass.Method: 'Method',
                 QOpcUa.NodeClass.ObjectType: 'ObjectType',
                 QOpcUa.NodeClass.VariableType: 'VariableType',
                 QOpcUa.NodeClass.ReferenceType: 'ReferenceType',
                 QOpcUa.NodeClass.DataType: 'DataType',
                 QOpcUa.NodeClass.View: 'View'}


def create_pixmap(color):
    p = QPixmap(10, 10)
    p.fill(color)
    return p


class TreeItem(QObject):

    def __init__(self, node, model):
        super(TreeItem, self).__init__(None)

        self._opc_node = node
        self._model = model
        self._attributes_ready = False
        self._browse_started = False
        self._child_items = []
        self._child_node_ids = []
        self._parent_item = None
        self._node_browse_name = ''
        self._node_id = ''
        self._node_display_name = ''
        self._node_class = QOpcUa.NodeClass.Undefined

        self._opc_node.attributeRead.connect(self.handleAttributes)
        self._opc_node.browseFinished.connect(self.browseFinished)

        if not self._opc_node.readAttributes(_DESIRED_ATTRIBUTES):
            qWarning("Reading attributes {} failed".format(self._opc_node.nodeId))

    @staticmethod
    def create_from_browsing_data(node, model, browsingData, parent):
        result = TreeItem(node, model)
        result._parent_item = parent
        result._node_browse_name = browsingData.browseName().name()
        result._node_class = browsingData.nodeClass()
        result._node_id = browsingData.targetNodeId().nodeId()
        result._node_display_name = browsingData.displayName().text()
        return result

    def child(self, row):
        return self._child_items[row]

    def childIndex(self, child):
        return self._child_items.index(child)

    def childCount(self):
        self.startBrowsing()
        return len(self._child_items)

    def columnCount(self):
        return _numberOfDisplayColumns

    def data(self, column):
        if column == 0:
            return self._node_browse_name
        if column == 1:
            if not self._attributes_ready:
                return "Loading ..."
            attribute = self._opc_node.attribute(QOpcUa.NodeAttribute.DataType)
            type = attribute if attribute else ''
            value = self._opc_node.attribute(QOpcUa.NodeAttribute.Value)
            return str(value) if value else ''
        if column == 2:
            name = _NODE_CLASSES.get(self._node_class)
            return "{} ({})".format(name, self._node_class) if name else str(self._node_class)
        if column == 3:
            if not self._attributes_ready:
                return "Loading ..."
            attribute = self._opc_node.attribute(QOpcUa.NodeAttribute.DataType)
            typeId = attribute if attribute else ''
            enumEntry = QOpcUa.namespace0IdFromNodeId(typeId)
            if enumEntry == QOpcUa.NodeIds.Namespace0.Unknown:
                return typeId
            return "{} ({})".format(QOpcUa.namespace0IdName(enumEntry), typeId)
        if column == 4:
            return self._node_id
        if column == 5:
            return self._node_display_name
        if column == 6:
            if not self._attributes_ready:
                return "Loading ..."
            description = self._opc_node.attribute(QOpcUa.NodeAttribute.Description)
            return description.text() if description else ''
        return None

    def row(self):
        return self._parent_item.childIndex(self) if self._parent_item else 0

    def parentItem(self):
        return self._parent_item

    def appendChild(self, child):
        if not child:
            return
        if not self.hasChildNodeItem(child._node_id):
            self._child_items.append(child)
            self._child_node_ids.append(child._node_id)

    def icon(self, column):
        global _object_pixmap, _variable_pixmap, _method_pixmap, _default_pixmap

        if column != 0 or not self._opc_node:
            return QPixmap()
        if self._node_class == QOpcUa.NodeClass.Object:
            if not _object_pixmap:
                _object_pixmap = create_pixmap(Qt.darkGreen)
            return _object_pixmap
        if self._node_class == QOpcUa.NodeClass.Variable:
            if not _variable_pixmap:
                _variable_pixmap = create_pixmap(Qt.darkBlue)
            return _variable_pixmap
        if self._node_class == QOpcUa.NodeClass.Method:
            if not _method_pixmap:
                _method_pixmap = create_pixmap(Qt.darkRed)
            return _method_pixmap
        if not _default_pixmap:
            _default_pixmap = create_pixmap(Qt.gray)
        return _default_pixmap

    def hasChildNodeItem(self, nodeId):
        return nodeId in self._child_node_ids

    def startBrowsing(self):
        if self._browse_started:
            return

        if not self._opc_node.browseChildren():
            qWarning("Browsing node {} failed".format(self._opc_node.nodeId()))
        else:
            self._browse_started = True

    def handleAttributes(self, attr):
        if attr & QOpcUa.NodeAttribute.NodeClass:
            self._node_class = self._opc_node.attribute(QOpcUa.NodeAttribute.NodeClass)
        if attr & QOpcUa.NodeAttribute.BrowseName:
            name_attr = self._opc_node.attribute(QOpcUa.NodeAttribute.BrowseName)
            self._node_browse_name = name_attr.name()
        if attr & QOpcUa.NodeAttribute.DisplayName:
            display_name_attr = self._opc_node.attribute(QOpcUa.NodeAttribute.DisplayName)
            self._node_display_name = display_name_attr.text()

        self._attributes_ready = True

        row = self.row()
        start_index = self._model.createIndex(row, 0, self)
        end_index = self._model.createIndex(row, _numberOfDisplayColumns - 1, self)
        self._model.dataChanged.emit(start_index, end_index)

    def browseFinished(self, children, status_code):
        if status_code != QOpcUa.Good:
            qWarning("Browsing node {} finally failed: {}".format(
                self._opc_node.nodeId(), status_code))
            return
        row = self.row()
        start_index = self._model.createIndex(row, 0, self)
        for item in children:
            node_id = item.targetNodeId()
            if self.hasChildNodeItem(node_id.nodeId()):
                continue
            node = self._model.opcUaClient().node(node_id)
            if not node:
                qWarning("Failed to instantiate node: {}".format(node_id.nodeId()))
                continue

            child_item_count = len(self._child_items)
            self._model.beginInsertRows(start_index, child_item_count,
                                        child_item_count + 1)
            self.appendChild(TreeItem.create_from_browsing_data(node, self._model, item, self))
            self._model.endInsertRows()

        end_index = self._model.createIndex(row, _numberOfDisplayColumns - 1, self)
        self._model.dataChanged.emit(start_index, end_index)