πΉ04- Introducing ctrlX CORE IDE App
Bosch rexroth
π₯ Introducing ctrlX CORE IDE App
In this video, you will see how we can run Python scripts in ctrlX CORE and trigger the scripts using Node-RED and PLC Engineering
π Resource files used in the video
π Python scripts
import functools import sys
#flush all prints
print = functools.partial(print, flush=True)
#For loop
for x in range(2):
print(x)
sys.exit(0)
#Reading from ctrlX Data Layer
Channel1 = datalayer.read("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/input/data/XI110116/Channel_1.Value")
#Writing to ctrlX Data Layer
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", Channel1)
sys.exit(0)python
import time
import sys
while True:
#Reading Channel 1 status from ctrlX Data Layer
Channel1 = datalayer.read("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/input/data/XI110116/Channel_1.Value")
#Reading Channel 3 status from ctrlX Data Layer
Channel3 = datalayer.read("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/input/data/XI110116/Channel_3.Value")
#blinking loop
if Channel1 is True:
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", True)
time.sleep(1) #delay of 1s
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", True)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", True)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", False)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", False)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", False)
time.sleep(1)
else:
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", False)
if Channel3 is True:
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", False)
sys.exit(0)
import sys
import time
import json
#Empty list
data = []
#Reading PLC variable from ctrlX Data Layer
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
#Average function
def avg(num):
sumOfNumbers = 0
for t in num:
sumOfNumbers = sumOfNumbers + t
avg = sumOfNumbers / len(num)
return avg
#Calculate Max
max_value = max(data)
#Calculate Min
min_value = min(data)
#Calculate Average
avg_value = avg(data)
#JSON to String
max_value_json = json.dumps({"type": "int16", "value": max_value})
min_value_json = json.dumps({"type": "int16", "value": min_value})
avg_value_json = json.dumps({"type": "float", "value": avg_value})
#Writing to PLC variable in ctrlX Data Layer
datalayer.write_json("plc/app/Application/sym/PLC_PRG/ScriptMax.Value",max_value_json )
datalayer.write_json("plc/app/Application/sym/PLC_PRG/ScriptMin.Value",min_value_json )
datalayer.write_json("plc/app/Application/sym/PLC_PRG/ScriptAvg.Value",avg_value_json )
sys.exit(0)
import sys
motion.attach_obj("X")
motion.attach_obj("Y")
motion.attach_obj("Z")
motion.attach_obj("Kinematics")
#Position1
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,0],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 0:
pass
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Position2
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,50],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 50:
pass
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Home position
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-400,-400,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("X").get("pos") != -400:
pass
motion.detach_obj("X")
motion.detach_obj("Y")
motion.detach_obj("Z")
motion.detach_obj("Kinematics")
sys.exit(0)
import time
import paho.mqtt.client as mqtt #import the client1
broker_address="192.168.0.112"
#broker_address="iot.eclipse.org" #use external broker
client = mqtt.Client("ctrlX CORE") #create new instance
#client.username_pw_set("boschrexroth","boschrexroth")
client.connect(broker_address,port=1884) #connect to broker
#client.subscribe("ctrlxon")
client.publish("ctrlxon","Hello ctrlX World")#publish
print("closing")
sys.exit(0)
import sys
import time
import paho.mqtt.client as mqtt_client #import the client1
broker = '192.168.0.112'
port = 1884
topic = "ctrlxon"
client_id = 'ctrlX CORE'
#username = 'boschrexroth'
#password = 'boschrexroth'
motion.attach_obj("X")
motion.attach_obj("Y")
motion.attach_obj("Z")
motion.attach_obj("Kinematics")
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
#client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
#Connect to MQTT Broker
client = connect_mqtt()
client.loop_start()
client.publish(topic, "Program Starts")
#Position1
client.publish(topic, "Going to Position 1")
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,0],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 0:
time.sleep(1)
pass
client.publish(topic,"Picked object")
client.publish(topic,"Going to Position 2")
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Position2
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,50],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 50:
time.sleep(1)
pass
client.publish(topic,"Placed Object")
client.publish(topic,"Going to Home position")
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Home position
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-400,-400,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("X").get("pos") != -400:
time.sleep(1)
pass
client.publish(topic,"Program finished") # working
time.sleep(1)
motion.detach_obj("X")
motion.detach_obj("Y")
motion.detach_obj("Z")
motion.detach_obj("Kinematics")
sys.exit(0)
π PLC Engineering Program
PROGRAM PLC_PRG
VAR
fbIL_ScriptInstance: IL_ScriptInstance;
StateInstance: CXA_PYTHON.INSTANCE_STATE;
bEnableInstance: BOOL;
bInOperationInstance: BOOL;
bErrorInstance: BOOL;
ErrorIDInstance: CXA_PYTHON.ERROR_CODE;
ErrorIdentInstance: CXA_PYTHON.ERROR_STRUCT;
strInstanceName: STRING := 'PLCInstance'; //Name of script instance to be created
bResetInstance: BOOL;
bAbortScript: BOOL;
fbIL_StartScriptFile: IL_StartScriptFile;
bExecuteFile: BOOL;
bDoneFile: BOOL;
bActiveFile: BOOL;
bErrorFile: BOOL;
ErrorIDFile: CXA_PYTHON.ERROR_CODE;
ErrorIdentFile: CXA_PYTHON.ERROR_STRUCT;
strFilePath: STRING(255) := 'activeConfiguration/script/Example2.py'; //Path to the file to be executed e.g. root folder of the active configuration
//strFilePath: STRING(255) := 'activeConfiguration/script/Example4PLC.py'; //Path to the file to be executed e.g. root folder of the active configuration
ParametersFile: ARRAY [0..9] OF STRING;
ScriptOutData : INT;
ScriptMax : INT;
ScriptMin : INT;
ScriptAvg : REAL;
END_VAR
fbIL_ScriptInstance(
Enable:= bEnableInstance,
InOperation=> bInOperationInstance,
Error=> bErrorInstance,
ErrorID=> ErrorIDInstance,
ErrorIdent=> ErrorIdentInstance,
InstanceName:= strInstanceName,
ResetInstance:= bResetInstance,
AbortScript:= bAbortScript,
State=> StateInstance);
fbIL_StartScriptFile(
Execute:= bExecuteFile,
Done=> bDoneFile,
Active=> bActiveFile,
Error=> bErrorFile,
ErrorID=> ErrorIDFile,
ErrorIdent=> ErrorIdentFile,
InstanceName:= strInstanceName,
FileName:= strFilePath,
Parameters:= ParametersFile);
PLC_PRG.ScriptOutData := PLC_PRG.ScriptOutData + 1;
IF PLC_PRG.ScriptOutData >= 200 THEN
PLC_PRG.ScriptOutData := 0;
END_IF
IF PLC_PRG.ScriptOutData <= 100 THEN
PLC_PRG.ScriptOutData := 100;
END_IF
π Node-RED Program
Previous03- Interfacing ctrlX CORE with ctrlX I/O via EtherCATNext05- Connecting OT with IT using Bosch DeviceBridge app
Last updated
Was this helpful?