Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Microsoft Fabric is an all-in-one analytics solution that covers everything from data movement to data science, analytics, and business intelligence. It offers a comprehensive suite of services, including data lake, data engineering, and data integration, all in one place. You don't even need an Azure subscription for it, let alone deploy or manage any apps or services. You can get started with Microsoft Fabric here.
Create a Fabric Eventhouse to Store your Production Line Data
- Log into Microsoft Fabric here.
- Create an
Eventhouseby opening your workspace, selectingNew item, then searching for and selectingEventhouse. Give it a name, e.g.opcua, and clickCreate. Both the eventhouse and a default KQL database with the same name are created. - Select your KQL database. In the
Database detailspane, under theOneLakesection, setAvailabilitytoEnabled. This will enable sharing your OPC UA time-series data from your production line within your organization via OneLake in Parquet file format.
Configure OPC UA PubSub Data Ingestion
These tables, mappings, functions and the materialized view mirror the ones the reference solution creates in Azure Data Explorer, so Fabric processes the OPC UA PubSub data exactly the same way ADX does.
Create the tables you need for ingesting the OPC UA PubSub data by clicking opcua_queryset, deleting the sample data in the text box, entering the following Kusto commands one-by-one, and then clicking Run for each command:
// Create a landing table for raw OPC UA telemetry
.create table opcua_raw(payload: dynamic)
// Create an intermediate table to unbatch our OPC UA PubSub messages into
.create table opcua_intermediate(DataSetWriterID: string, Timestamp: datetime, Payload: dynamic)
// Create our final OPC UA telemetry table
.create table opcua_telemetry (DataSetWriterID: string, Timestamp: datetime, Name: string, Value: dynamic)
// Create a landing table for raw OPC UA metadata
.create table opcua_metadata_raw(payload: dynamic)
// Create an OPC UA metadata landing table
.create table opcua_metadata(DataSetWriterID: string, Timestamp: datetime, Name: string, Type: string, DisplayName:string, Workcell: string, Line: string, Area: string, Site: string, Enterprise: string, NamespaceUri: string, NodeId: string)
Then run the following Kusto commands one-by-one:
// Create a function to do the raw OPC UA expansion
.create-or-alter function OPCUARawExpand() { opcua_raw | mv-expand records = payload.Messages | where records != '' | project DataSetWriterID = tostring(records["DataSetWriterId"]), Timestamp = todatetime(records["Timestamp"]), Payload = todynamic(records["Payload"]) }
// Create a function to do the OPC UA dataset expansion
.create-or-alter function OPCUADatasetExpand() { opcua_intermediate | mv-apply Payload on (extend key = tostring(bag_keys(Payload)[0]) | extend p = Payload[key] | project Name = key, Value = todynamic(p.Value)) }
// Create a function to do the raw OPC UA metadata expansion
.create-or-alter function OPCUAMetaDataExpand() { opcua_metadata_raw | parse tostring(payload.MetaData.Name) with * ":" Workcell "." Line "." Area "." Site "." Enterprise ";nsu=" NamespaceUri ";" NodeId | project DataSetWriterId = tostring(payload.DataSetWriterId), Timestamp = todatetime(payload.Timestamp), Name = tostring(payload.MetaData.Name), Type = tostring(payload.MetaData.Fields[0].Description), DisplayName = tostring(payload.MetaData.Fields[0].Name), Workcell, Line, Area, Site, Enterprise, NamespaceUri, NodeId }
// Create a materialized view for the last known value (LKV) of our metadata
.create materialized-view opcua_metadata_lkv on table opcua_metadata { opcua_metadata | summarize arg_max(Timestamp, *) by Name, DataSetWriterID }
Then run the following Kusto commands one-by-one:
// Create mapping from JSON ingestion to the landing table
.create-or-alter table opcua_raw ingestion json mapping 'opcua_mapping' '[{"column":"payload","path":"$","datatype":"dynamic"}]'
// Apply the raw expansion function to the OPC UA raw table
.alter table opcua_intermediate policy update @'[{"Source": "opcua_raw", "Query": "OPCUARawExpand()", "IsEnabled": "True"}]'
// Apply the dataset expansion function to the intermediate table
.alter table opcua_telemetry policy update @'[{"Source": "opcua_intermediate", "Query": "OPCUADatasetExpand()", "IsEnabled": "True"}]'
// Create mapping from JSON ingestion to the metadata landing table
.create-or-alter table opcua_metadata_raw ingestion json mapping 'opcua_metadata_mapping' '[{"column":"payload","path":"$","datatype":"dynamic"}]'
// Apply the raw metadata expansion function to the metadata landing table
.alter table opcua_metadata policy update @'[{"Source": "opcua_metadata_raw", "Query": "OPCUAMetaDataExpand()", "IsEnabled": "True"}]'
Connect Fabric to your existing Azure Event Hubs
The reference solution deploys an Azure Event Hubs namespace named <resourcesName>-EventHubs (where <resourcesName> is the name you chose during deployment) that already receives your OPC UA PubSub data on two event hubs:
| Event hub | Contents | KQL landing table | Ingestion mapping |
|---|---|---|---|
data |
OPC UA PubSub telemetry | opcua_raw |
opcua_mapping |
metadata |
OPC UA PubSub metadata | opcua_metadata_raw |
opcua_metadata_mapping |
The Azure Data Explorer (ADX) cluster deployed by the solution consumes these Event Hubs through a dedicated adx consumer group. To let Fabric read the same data without interfering with ADX, create a separate consumer group for Fabric on each event hub. You can do this in the Azure portal under the event hub's Consumer groups blade. Call the new consumer group 'fabric'.
You will also need a connection string with at least Listen rights. The simplest option is the namespace-level RootManageSharedAccessKey policy: in the Azure portal, open your <resourcesName>-EventHubs namespace, select Shared access policies -> RootManageSharedAccessKey and copy the Connection string-primary key.
Ingest the telemetry event hub (data -> opcua_raw)
- In your Fabric workspace, select
New item, then search for and selectEventstream. Name it e.g.eventstream_opcua_dataand clickCreate. - Select
Add source->Azure Event Hubs. UnderConnection, selectNew connectionand enter your<resourcesName>-EventHubsnamespace, thedataevent hub, and theRootManageSharedAccessKeyshared access key name and key. Back on the source page, select thefabricconsumer group (or$Default) and setData formattoJson. SelectNext, then on theReview + connectpage selectAdd. Finally, selectPublishto publish the eventstream. - Select
Add destination->Eventhouse. ChooseDirect ingestion, enter aDestination name, then select yourWorkspace,Eventhouse, and the KQL database you created earlier. SelectSave, connect the destination card to your stream output if it isn't already, and selectPublish. - In
Live view, selectConfigureon the Eventhouse destination node to open theGet datascreen. Select the existingopcua_rawtable, keep or edit theData connection name, and selectNext. On theInspect the datascreen, confirm theFormatisJSON(the existingopcua_mappingroutes the raw payload into thepayloadcolumn; you can review it via theTable_mappingdropdown orAdvancedoptions). SelectFinish, then selectCloseon theSummaryscreen.
Ingest the metadata event hub (metadata -> opcua_metadata_raw)
- Create a second eventstream by selecting
New item->Eventstream, name it e.g.eventstream_opcua_metadataand clickCreate. - Select
Add source->Azure Event Hubs. Create or select a connection exactly as above, but set the event hub tometadata(consumer groupfabricor$Default). SetData formattoJson, selectNext, thenAddon theReview + connectpage, andPublishthe eventstream. - Select
Add destination->Eventhouse. ChooseDirect ingestion, enter aDestination name, then select yourWorkspace,Eventhouse, and the same KQL database. SelectSave, connect the destination card to your stream output if it isn't already, and selectPublish. - In
Live view, selectConfigureon the Eventhouse destination node to open theGet datascreen. Select the existingopcua_metadata_rawtable, keep or edit theData connection name, and selectNext. On theInspect the datascreen, confirm theFormatisJSON(the existingopcua_metadata_mappingroutes the raw payload into thepayloadcolumn; you can review it via theTable_mappingdropdown orAdvancedoptions). SelectFinish, then selectCloseon theSummaryscreen.
Once both eventstreams are running, the update policies and the opcua_metadata_lkv materialized view you created above automatically expand the raw OPC UA PubSub messages into the opcua_telemetry and opcua_metadata tables, exactly like the ADX deployment.
Create a Fabric Lakehouse to Share Your OPC UA Data within Your Organization
To share your OPC UA data via OneLake, create a Lakehouse by selecting New item in your workspace, then searching for and selecting Lakehouse. Give it a name, e.g. opcua_lake, and click Create.
- Under
Tables, selectNew shortcut, selectMicrosoft OneLake, select your KQL database, expand theTablesnode and selectopcua_telemetry. - Under
Tables, selectNew shortcut, selectMicrosoft OneLake, select your KQL database, expand theTablesnode and selectopcua_metadata.
View Your OPC UA Data Flow in Fabric
Click on your workspace, select Lineage view to see the entire flow of OPC UA data you have just setup in Microsoft Fabric.
Run a Sample Data Query
Open your KQL database and select its opcua_queryset. Delete the sample queries, enter the following query in the text box, and select Run:
let _startTime = ago(1h);
let _endTime = now();
opcua_metadata
| where Name contains "assembly"
| where Name contains "munich"
| join kind=inner (opcua_telemetry
| where Name == "Status"
| where Timestamp > _startTime and Timestamp < _endTime
) on DataSetWriterID
| extend energy = todouble(Value)
| project Timestamp1, energy
| sort by Timestamp1 desc
| render linechart
Import OPC UA Information Models from the UA Cloud Library (hosted by the OPC Foundation)
Beyond the metadata published via OPC UA PubSub, you can import entire OPC UA Information Models into your Eventhouse from the UA Cloud Library, an online store of OPC UA Information Models hosted by the OPC Foundation. Importing the OPC UA nodes defined in an Information Model into a table lets you look up richer semantics within your queries, including the full model hierarchy, complex type definitions and all available telemetry from your sites.
Because the Fabric Eventhouse KQL engine supports the http_request plugin, the queries below work in Fabric exactly like they do in ADX.
Register and find an Information Model
- Register for free at the UA Cloud Library: https://uacloudlibrary.opcfoundation.org/Identity/Account/Register.
- Browse the available Information Models at https://uacloudlibrary.opcfoundation.org/Explorer and note the unique ID of the model you want to import. You can find this ID in the URL of the model's page. For example, the
Stationnodeset used by this reference solution has the ID1627266626. - Create a basic authorization header from your UA Cloud Library credentials. Generate the Base64 hash with the bash command
echo -n 'username:password' | base64, or use a tool such as https://www.debugbear.com/basic-auth-header-generator.
Enable the http_request plugin and allow the UA Cloud Library endpoint
Unlike Azure Data Explorer, a Fabric Eventhouse has the http_request plugin disabled by default, so it must be enabled first. In your KQL database, click Explore your data and run the following commands (you need database admin permissions):
// Enable the http_request plugin used to call the UA Cloud Library REST API
.enable plugin http_request
// Allow Kusto to call the UA Cloud Library endpoint
.alter cluster policy callout @'[{"CalloutType": "webapi","CalloutUriRegex": "uacloudlibrary.opcfoundation.org","CanCall": true}]'
Import an Information Model
Run the following query to download an Information Model from the UA Cloud Library and expand its variable nodes. Replace <INFORMATION_MODEL_IDENTIFIER_FROM_THE_UA_CLOUD_LIBRARY> with the model's unique ID (for example 1627266626) and <HASHED_CLOUD_LIBRARY_CREDENTIALS> with your Base64-encoded credentials:
let uri='https://uacloudlibrary.opcfoundation.org/infomodel/download/<INFORMATION_MODEL_IDENTIFIER_FROM_THE_UA_CLOUD_LIBRARY>';
let headers=dynamic({'accept':'text/plain', 'Authorization':'Basic <HASHED_CLOUD_LIBRARY_CREDENTIALS>'});
evaluate http_request(uri, headers)
| project title = tostring(ResponseBody.['title']), contributor = tostring(ResponseBody.contributor.name), nodeset = parse_xml(tostring(ResponseBody.nodeset.nodesetXml))
| mv-expand UAVariable=nodeset.UANodeSet.UAVariable
| project-away nodeset
| extend NodeId = UAVariable.['@NodeId'], DisplayName = tostring(UAVariable.DisplayName.['#text']), BrowseName = tostring(UAVariable.['@BrowseName']), DataType = tostring(UAVariable.['@DataType'])
| project-away UAVariable
| take 10000
To persist the imported model into a table (for example opcua_information_model) so you can join it with your opcua_telemetry and opcua_metadata tables, wrap the same query with .set-or-append. The table is created automatically on the first run:
.set-or-append opcua_information_model <|
let uri='https://uacloudlibrary.opcfoundation.org/infomodel/download/<INFORMATION_MODEL_IDENTIFIER_FROM_THE_UA_CLOUD_LIBRARY>';
let headers=dynamic({'accept':'text/plain', 'Authorization':'Basic <HASHED_CLOUD_LIBRARY_CREDENTIALS>'});
evaluate http_request(uri, headers)
| project title = tostring(ResponseBody.['title']), contributor = tostring(ResponseBody.contributor.name), nodeset = parse_xml(tostring(ResponseBody.nodeset.nodesetXml))
| mv-expand UAVariable=nodeset.UANodeSet.UAVariable
| extend NodeId = tostring(UAVariable.['@NodeId']), DisplayName = tostring(UAVariable.DisplayName.['#text']), BrowseName = tostring(UAVariable.['@BrowseName']), DataType = tostring(UAVariable.['@DataType'])
| project title, contributor, NodeId, DisplayName, BrowseName, DataType
| take 10000
Visualize an Information Model as a graph
To view a graphical representation of an OPC UA Information Model, run the following query and switch the result view to Graph. For best results, set the Layout option to Grouped and the Labels to name:
let uri='https://uacloudlibrary.opcfoundation.org/infomodel/download/1627266626';
let headers=dynamic({'accept':'text/plain', 'Authorization':'Basic <HASHED_CLOUD_LIBRARY_CREDENTIALS>'});
let variables = evaluate http_request(uri, headers)
| project title = tostring(ResponseBody.['title']), contributor = tostring(ResponseBody.contributor.name), nodeset = parse_xml(tostring(ResponseBody.nodeset.nodesetXml))
| mv-expand UAVariable = nodeset.UANodeSet.UAVariable
| extend NodeId = UAVariable.['@NodeId'], ParentNodeId = UAVariable.['@ParentNodeId'], DisplayName = tostring(UAVariable['DisplayName']), DataType = tostring(UAVariable.['@DataType']), References = tostring(UAVariable.['References'])
| where References !contains "HasModellingRule"
| where DisplayName != "InputArguments"
| project-away nodeset, UAVariable, References;
let objects = evaluate http_request(uri, headers)
| project title = tostring(ResponseBody.['title']), contributor = tostring(ResponseBody.contributor.name), nodeset = parse_xml(tostring(ResponseBody.nodeset.nodesetXml))
| mv-expand UAObject = nodeset.UANodeSet.UAObject
| extend NodeId = UAObject.['@NodeId'], ParentNodeId = UAObject.['@ParentNodeId'], DisplayName = tostring(UAObject['DisplayName']), References = tostring(UAObject.['References'])
| where References !contains "HasModellingRule"
| project-away nodeset, UAObject, References;
let nodes = variables
| project source = tostring(NodeId), target = tostring(ParentNodeId), name = tostring(DisplayName)
| join kind=fullouter (objects
| project source = tostring(NodeId), target = tostring(ParentNodeId), name = tostring(DisplayName)) on source
| project source = coalesce(source, source1), target = coalesce(target, target1), name = coalesce(name, name1);
let edges = nodes;
edges
| make-graph source --> target with nodes on source
Useful KQL Database Helper-Functions for Advanced Queries
.create-or-alter function QuerySpecificValue(stationName: string, productionLineName: string, valueToQuery: string, desiredValue: real) {
opcua_metadata_lkv
| where Name contains stationName
| where Name contains productionLineName
| join kind = inner(opcua_telemetry
| where Name == valueToQuery
| where Value == desiredValue
| where Timestamp > ago(5m)
) on DataSetWriterID
| project Timestamp1
| sort by Timestamp1 desc
| take 1
}
.create-or-alter function QuerySpecificTime(stationName: string, productionLineName: string, valueToQuery: string, timeToQuery: datetime, idealCycleTime: timespan) {
opcua_metadata_lkv
| where Name contains stationName
| where Name contains productionLineName
| join kind = inner(opcua_telemetry
| where Name == valueToQuery
| where Timestamp > ago(5m)
) on DataSetWriterID
| where around(Timestamp1, timeToQuery, idealCycleTime)
| sort by Timestamp1 desc
| project Value
| take 1
}
.create-or-alter function EnergyPerPart(productionLineName: string, idealCycleTime: timespan) {
// check if a new part was produced (last machine in the production line, i.e. packaging, is in state 2 ("done") with a passed QA)
// and get the part's serial number and energy consumption at that time
let timeLatestProductWasProduced = toscalar(QuerySpecificValue("packaging", productionLineName, "Status", "2"));
let serialNumber = toscalar(QuerySpecificTime("packaging", productionLineName, "ProductSerialNumber", timeLatestProductWasProduced, idealCycleTime));
//
let timePartWasProducedPackaging = toscalar(timeLatestProductWasProduced);
let energyPackaging = toscalar(QuerySpecificTime("packaging", productionLineName, "EnergyConsumption", timePartWasProducedPackaging, idealCycleTime));
//
// check each other machine for the time when the product with this serial number was in the machine and get its energy comsumption at that time
let timePartWasProducedTest = toscalar(QuerySpecificValue("test", productionLineName, "ProductSerialNumber", serialNumber));
let energyTest = toscalar(QuerySpecificTime("test", productionLineName, "EnergyConsumption", timePartWasProducedTest, idealCycleTime));
//
let timePartWasProducedAssembly = toscalar(QuerySpecificValue("assembly", productionLineName, "ProductSerialNumber", serialNumber));
let energyAssembly = toscalar(QuerySpecificTime("assembly", productionLineName, "EnergyConsumption", timePartWasProducedAssembly, idealCycleTime));
//
// calculate the total energy consumption for the product by summing up all the machines' energy consumptions (in kW), multiply by 1000 to get Watts and then multiply by the ideal cycle time (which is in seconds) divided by 3600 to get Wh
let totalenergy = (todouble(energyAssembly) + todouble(energyTest) + todouble(energyPackaging)) * 1000 * todouble(format_timespan(idealCycleTime, "s")) / 3600;
print serialNumber, timeLatestProductWasProduced, totalenergy
}