import time
from pyDXL import *
dx = DXLProtocolV2('/dev/ttyAMA0', 57600, 0.05)
ID = 1
dx.Write8(ID, 512, 0)
if input('Clear? (y/N) ') == 'y':
dx.TxPacket(ID, DXLProtocolV2.INST_CLEAR, (0x01, 0x44, 0x58, 0x4c, 0x22))
dx.RxPacket(timeout = 5.0)
time.sleep(1)
dx.Write8(ID, 33, 1)
dx.Write8(ID, 512, 1)
for i in range(-550000, 550000, 550000 >> 4):
dx.Write32(ID, 528, i)
time.sleep(0.5)
dx.Write8(ID, 512, 0)
import time
from pyDXL import *
dx = DXLProtocolV2('/dev/ttyAMA0', 57600, 0.05)
ID = 1
dx.Write8(ID, 512, 0)
if input('Clear? (y/N) ') == 'y':
dx.TxPacket(ID, DXLProtocolV2.INST_CLEAR, (0x01, 0x44, 0x58, 0x4c, 0x22))
dx.RxPacket(timeout = 5.0)
time.sleep(1)
dx.Write8(ID, 33, 3)
dx.Write8(ID, 512, 1)
for i in range(-262144, 262144, 512):
dx.Write32(ID, 532, i)
dx.Write8(ID, 512, 0)
#!/usr/bin/python3
#
# Link two servos with virtual wires
import os, platform, psutil, time, kbhit, serial, numpy as np
from PeriodicTimer import PeriodicTimer
from jp200class import packet
intervalms = 20
try:
pr = psutil.Process(os.getpid())
if platform.system() == 'Windows':
pr.nice(psutil.HIGH_PRIORITY_CLASS)
else:
pr.nice(-10)
except:
print('info:run with sudo')
# present value
present_dt = [('ang',float),('cur',float),('velo',float)]
present = np.empty(2, dtype = present_dt)
# calculated current command value
tcur = [0.0] * 2
#--------------------------------
# Periodically executed functioin
#--------------------------------
def cycfunc(*p):
sv = p[0]
acks = []
# deviation from each other's position
tcur = (
np.clip((present[1]['ang'] - present[0]['ang']) * 0.15, -1000.0, 1000.0),
np.clip((present[0]['ang'] - present[1]['ang']) * 0.15, -1000.0, 1000.0)
)
# send current command and current position acquisition to JP200
sv.request(
'#0EX=4SG2=9000;5500;10TC=%dTP=1000CACVCC'
'#1EX=4SG2=9000;5500;10TC=%dTP=1000CACVCC'
%(tcur[0], tcur[1]), acks
)
# analyze response
if acks:
for ack in acks:
m = sv.p2.findall(ack)
id = -1
for n in m:
try:
if n[0] == '#' and int(n[1]) >= 0: id = int(n[1])
elif n[0] == 'CA' and id >= 0: present[id]['ang'] = float(n[1])
elif n[0] == 'CC' and id >= 0: present[id]['cur'] = float(n[1])
elif n[0] == 'CV' and id >= 0: present[id]['velo'] = float(n[1])
except:
pass
if id >= 0: print('[%d] %4d %8d %5d %5d '%(id, tcur[id], present[id]['ang'], present[id]['velo'], present[id]['cur']), end = '')
print(end = '\r')
# instantiate kbhit
kb = kbhit.KBHit()
# open UART
ser = serial.Serial('/dev/ttyAMA0', 2000000)
print(ser.name)
# instantiate JP200
sv = packet(ser, 0.02)
# instantiate periodic timer & start
tm = PeriodicTimer(intervalms / 1000, cycfunc, [sv])
acks = []
k = ''
while k != 'q':
# waiting for key input
try:
k = kb.getch()
except UnicodeDecodeError:
pass
print(k, end = '\n', flush = True)
# move to origin position
if k == 'h':
tm.stop()
sv.request('#0EX=1TA=18000TP=1000#1EX=1TA=18000TP=1000', acks)
# stop timer
elif k == 's':
tm.stop()
tm.start()
# start timer
elif k == 'e':
tm.stop()
sv.request('#0EX=0#1EX=0', acks)
elif k == 'c':
tm.stop()
sv.request('#0EX=8TP=1000#1EX=8TP=1000', acks)
time.sleep(5)
sv.request('#0EX=8TP=-1000#1EX=8TP=-1000', acks)
time.sleep(5)
sv.request('#0EX=0#1EX=0', acks)
else:
print('XXXXXXXXX')
tm.stop()
sv.request('#0EX=0#1EX=0', acks, True)
ser.close()
kb.set_normal_term()
del sv, ser, kb
os._exit(0)
#!/usr/bin/python3
from pyPMX import *
from pyDXL import *
import math, kbhit, time, serial, threading
# Communication task with PMX
def func1(arg):
id = 0
# Class instantiation
pmx = PMXProtocol(arg[0], lock = arg[1])
# Setting initial conditions
pmx.MemREAD(id, 400, 6) # Clear error flag
pmx.MotorWRITE(id, 2, ()) # Torque free
pmx.MemWRITE8(id, 501, (1, 0b11111)) # Control mode (All)
pmx.MotorWRITE(id, 1, ()) # Torque enable
while arg[2]:
ang = int(math.sin(time.time()) * 9000.0)
r = pmx.MotorWRITE(id, 0, (ang,)) # Angle command
if r != None:
arg[3] = r[1][0]
time.sleep(0.001)
pmx.MotorWRITE(id, 2, ()) # Torque free
del pmx
# Communication task with DXL
def func2(arg):
id = 1
cnt = 0
led = 0
# Class instantiation
dx2 = DXLProtocolV2(arg[0], lock = arg[1])
# Setting initial conditions
dx2.Write8(id, 64, 1) # Torque enable
while arg[2]:
ang = int(math.sin(time.time()) * 1024.0) + 2047
dx2.Write32(id, 116, ang) # Angle command
r = dx2.Read32(id, 132, signed = True) # Read feedback values
if r != None:
arg[4] = r
cnt += 1
if cnt % 10 == 0:
led ^= 1
dx2.Write8(id, 65, led) # Update LED status
time.sleep(0.001)
dx2.Write8(id, 65, 0) # LED off
dx2.Write8(id, 64, 0) # Torque free
del dx2
kb = kbhit.KBHit()
# Open serial port
com = serial.Serial('/dev/ttyAMA0', 1000000, timeout=0.005)
lock = threading.Lock()
act = True
v1 = 0
v2 = 0
SharedVar = [com, lock, act, v1, v2]
# Start thread
th1 = threading.Thread(target = func1, args = [SharedVar])
th2 = threading.Thread(target = func2, args = [SharedVar])
th1.start()
th2.start()
# Loop until keyboard input
while True:
if kb.kbhit():
break
print(f' pmx:{SharedVar[3]:6d}, dxl:{SharedVar[4]:6d}', end = '\r')
time.sleep(0.05)
# End
SharedVar[2] = False
th1.join()
th2.join()
del kb, lock, com, th1, th2
print()