๐น04- Introducing ctrlX CORE IDE App
Bosch rexroth
Previous03- Interfacing ctrlX CORE with ctrlX I/O via EtherCATNext05- Connecting OT with IT using Bosch DeviceBridge app
Last updated
Bosch rexroth
Last updated
In this video, you will see how we can run Python scripts in ctrlX CORE and trigger the scripts using Node-RED and PLC Engineering
import functools import sys
#flush all prints
print = functools.partial(print, flush=True)
#For loop
for x in range(2):
print(x)
sys.exit(0)
#Reading from ctrlX Data Layer
Channel1 = datalayer.read("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/input/data/XI110116/Channel_1.Value")
#Writing to ctrlX Data Layer
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", Channel1)
sys.exit(0)python
import time
import sys
while True:
#Reading Channel 1 status from ctrlX Data Layer
Channel1 = datalayer.read("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/input/data/XI110116/Channel_1.Value")
#Reading Channel 3 status from ctrlX Data Layer
Channel3 = datalayer.read("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/input/data/XI110116/Channel_3.Value")
#blinking loop
if Channel1 is True:
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", True)
time.sleep(1) #delay of 1s
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", True)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", True)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", False)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", False)
time.sleep(1)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", False)
time.sleep(1)
else:
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", False)
if Channel3 is True:
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_1.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_2.Value", False)
datalayer.write("fieldbuses/ethercat/master/instances/ethercatmaster/realtime_data/output/data/XI211116/Channel_3.Value", False)
sys.exit(0)
import sys
import time
import json
#Empty list
data = []
#Reading PLC variable from ctrlX Data Layer
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
plcdata = datalayer.read("plc/app/Application/sym/PLC_PRG/ScriptOutData.Value")
data.append(plcdata)
time.sleep(0.3)
#Average function
def avg(num):
sumOfNumbers = 0
for t in num:
sumOfNumbers = sumOfNumbers + t
avg = sumOfNumbers / len(num)
return avg
#Calculate Max
max_value = max(data)
#Calculate Min
min_value = min(data)
#Calculate Average
avg_value = avg(data)
#JSON to String
max_value_json = json.dumps({"type": "int16", "value": max_value})
min_value_json = json.dumps({"type": "int16", "value": min_value})
avg_value_json = json.dumps({"type": "float", "value": avg_value})
#Writing to PLC variable in ctrlX Data Layer
datalayer.write_json("plc/app/Application/sym/PLC_PRG/ScriptMax.Value",max_value_json )
datalayer.write_json("plc/app/Application/sym/PLC_PRG/ScriptMin.Value",min_value_json )
datalayer.write_json("plc/app/Application/sym/PLC_PRG/ScriptAvg.Value",avg_value_json )
sys.exit(0)
import sys
motion.attach_obj("X")
motion.attach_obj("Y")
motion.attach_obj("Z")
motion.attach_obj("Kinematics")
#Position1
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,0],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 0:
pass
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Position2
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,50],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 50:
pass
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Home position
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-400,-400,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("X").get("pos") != -400:
pass
motion.detach_obj("X")
motion.detach_obj("Y")
motion.detach_obj("Z")
motion.detach_obj("Kinematics")
sys.exit(0)
import time
import paho.mqtt.client as mqtt #import the client1
broker_address="192.168.0.112"
#broker_address="iot.eclipse.org" #use external broker
client = mqtt.Client("ctrlX CORE") #create new instance
#client.username_pw_set("boschrexroth","boschrexroth")
client.connect(broker_address,port=1884) #connect to broker
#client.subscribe("ctrlxon")
client.publish("ctrlxon","Hello ctrlX World")#publish
print("closing")
sys.exit(0)
import sys
import time
import paho.mqtt.client as mqtt_client #import the client1
broker = '192.168.0.112'
port = 1884
topic = "ctrlxon"
client_id = 'ctrlX CORE'
#username = 'boschrexroth'
#password = 'boschrexroth'
motion.attach_obj("X")
motion.attach_obj("Y")
motion.attach_obj("Z")
motion.attach_obj("Kinematics")
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
#client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
#Connect to MQTT Broker
client = connect_mqtt()
client.loop_start()
client.publish(topic, "Program Starts")
#Position1
client.publish(topic, "Going to Position 1")
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,0],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 0:
time.sleep(1)
pass
client.publish(topic,"Picked object")
client.publish(topic,"Going to Position 2")
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-100,-100,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Position2
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,50],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("Z").get("pos") != 50:
time.sleep(1)
pass
client.publish(topic,"Placed Object")
client.publish(topic,"Going to Home position")
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[250,250,100],coord_sys="PCS",vel=1500,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
#Home position
motion.kin_cmd_move_lin_abs(kin="Kinematics",pos=[-400,-400,100],coord_sys="PCS",vel=2000,acc=1000,dec=1000,jrk_acc=0,jrk_dec=0)
while motion.get_axs_ipo_values("X").get("pos") != -400:
time.sleep(1)
pass
client.publish(topic,"Program finished") # working
time.sleep(1)
motion.detach_obj("X")
motion.detach_obj("Y")
motion.detach_obj("Z")
motion.detach_obj("Kinematics")
sys.exit(0)
PROGRAM PLC_PRG
VAR
fbIL_ScriptInstance: IL_ScriptInstance;
StateInstance: CXA_PYTHON.INSTANCE_STATE;
bEnableInstance: BOOL;
bInOperationInstance: BOOL;
bErrorInstance: BOOL;
ErrorIDInstance: CXA_PYTHON.ERROR_CODE;
ErrorIdentInstance: CXA_PYTHON.ERROR_STRUCT;
strInstanceName: STRING := 'PLCInstance'; //Name of script instance to be created
bResetInstance: BOOL;
bAbortScript: BOOL;
fbIL_StartScriptFile: IL_StartScriptFile;
bExecuteFile: BOOL;
bDoneFile: BOOL;
bActiveFile: BOOL;
bErrorFile: BOOL;
ErrorIDFile: CXA_PYTHON.ERROR_CODE;
ErrorIdentFile: CXA_PYTHON.ERROR_STRUCT;
strFilePath: STRING(255) := 'activeConfiguration/script/Example2.py'; //Path to the file to be executed e.g. root folder of the active configuration
//strFilePath: STRING(255) := 'activeConfiguration/script/Example4PLC.py'; //Path to the file to be executed e.g. root folder of the active configuration
ParametersFile: ARRAY [0..9] OF STRING;
ScriptOutData : INT;
ScriptMax : INT;
ScriptMin : INT;
ScriptAvg : REAL;
END_VAR
fbIL_ScriptInstance(
Enable:= bEnableInstance,
InOperation=> bInOperationInstance,
Error=> bErrorInstance,
ErrorID=> ErrorIDInstance,
ErrorIdent=> ErrorIdentInstance,
InstanceName:= strInstanceName,
ResetInstance:= bResetInstance,
AbortScript:= bAbortScript,
State=> StateInstance);
fbIL_StartScriptFile(
Execute:= bExecuteFile,
Done=> bDoneFile,
Active=> bActiveFile,
Error=> bErrorFile,
ErrorID=> ErrorIDFile,
ErrorIdent=> ErrorIdentFile,
InstanceName:= strInstanceName,
FileName:= strFilePath,
Parameters:= ParametersFile);
PLC_PRG.ScriptOutData := PLC_PRG.ScriptOutData + 1;
IF PLC_PRG.ScriptOutData >= 200 THEN
PLC_PRG.ScriptOutData := 0;
END_IF
IF PLC_PRG.ScriptOutData <= 100 THEN
PLC_PRG.ScriptOutData := 100;
END_IF