1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import time from pyDXL import * dx = DXLProtocolV2( '/dev/ttyAMA0' , 57600 , 0.05 ) ID = 1 dx.Write8( ID , 512 , 0 ) if input ( 'Clear? (y/N) ' ) = = 'y' : dx.TxPacket( ID , DXLProtocolV2.INST_CLEAR, ( 0x01 , 0x44 , 0x58 , 0x4c , 0x22 )) dx.RxPacket(timeout = 5.0 ) time.sleep( 1 ) dx.Write8( ID , 33 , 1 ) dx.Write8( ID , 512 , 1 ) for i in range ( - 550000 , 550000 , 550000 >> 4 ): dx.Write32( ID , 528 , i) time.sleep( 0.5 ) dx.Write8( ID , 512 , 0 ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import time from pyDXL import * dx = DXLProtocolV2( '/dev/ttyAMA0' , 57600 , 0.05 ) ID = 1 dx.Write8( ID , 512 , 0 ) if input ( 'Clear? (y/N) ' ) = = 'y' : dx.TxPacket( ID , DXLProtocolV2.INST_CLEAR, ( 0x01 , 0x44 , 0x58 , 0x4c , 0x22 )) dx.RxPacket(timeout = 5.0 ) time.sleep( 1 ) dx.Write8( ID , 33 , 3 ) dx.Write8( ID , 512 , 1 ) for i in range ( - 262144 , 262144 , 512 ): dx.Write32( ID , 532 , i) dx.Write8( ID , 512 , 0 ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | #!/usr/bin/python3 # # Link two servos with virtual wires import os, platform, psutil, time, kbhit, serial, numpy as np from PeriodicTimer import PeriodicTimer from jp200class import packet intervalms = 20 try : pr = psutil.Process(os.getpid()) if platform.system() = = 'Windows' : pr.nice(psutil.HIGH_PRIORITY_CLASS) else : pr.nice( - 10 ) except : print ( 'info:run with sudo' ) # present value present_dt = [( 'ang' , float ),( 'cur' , float ),( 'velo' , float )] present = np.empty( 2 , dtype = present_dt) # calculated current command value tcur = [ 0.0 ] * 2 #-------------------------------- # Periodically executed functioin #-------------------------------- def cycfunc( * p): sv = p[ 0 ] acks = [] # deviation from each other's position tcur = ( np.clip((present[ 1 ][ 'ang' ] - present[ 0 ][ 'ang' ]) * 0.15 , - 1000.0 , 1000.0 ), np.clip((present[ 0 ][ 'ang' ] - present[ 1 ][ 'ang' ]) * 0.15 , - 1000.0 , 1000.0 ) ) # send current command and current position acquisition to JP200 sv.request( '#0EX=4SG2=9000;5500;10TC=%dTP=1000CACVCC' '#1EX=4SG2=9000;5500;10TC=%dTP=1000CACVCC' % (tcur[ 0 ], tcur[ 1 ]), acks ) # analyze response if acks: for ack in acks: m = sv.p2.findall(ack) id = - 1 for n in m: try : if n[ 0 ] = = '#' and int (n[ 1 ]) > = 0 : id = int (n[ 1 ]) elif n[ 0 ] = = 'CA' and id > = 0 : present[ id ][ 'ang' ] = float (n[ 1 ]) elif n[ 0 ] = = 'CC' and id > = 0 : present[ id ][ 'cur' ] = float (n[ 1 ]) elif n[ 0 ] = = 'CV' and id > = 0 : present[ id ][ 'velo' ] = float (n[ 1 ]) except : pass if id > = 0 : print ( '[%d] %4d %8d %5d %5d ' % ( id , tcur[ id ], present[ id ][ 'ang' ], present[ id ][ 'velo' ], present[ id ][ 'cur' ]), end = '') print (end = '\r' ) # instantiate kbhit kb = kbhit.KBHit() # open UART ser = serial.Serial( '/dev/ttyAMA0' , 2000000 ) print (ser.name) # instantiate JP200 sv = packet(ser, 0.02 ) # instantiate periodic timer & start tm = PeriodicTimer(intervalms / 1000 , cycfunc, [sv]) acks = [] k = '' while k ! = 'q' : # waiting for key input try : k = kb.getch() except UnicodeDecodeError: pass print (k, end = '\n' , flush = True ) # move to origin position if k = = 'h' : tm.stop() sv.request( '#0EX=1TA=18000TP=1000#1EX=1TA=18000TP=1000' , acks) # stop timer elif k = = 's' : tm.stop() tm.start() # start timer elif k = = 'e' : tm.stop() sv.request( '#0EX=0#1EX=0' , acks) elif k = = 'c' : tm.stop() sv.request( '#0EX=8TP=1000#1EX=8TP=1000' , acks) time.sleep( 5 ) sv.request( '#0EX=8TP=-1000#1EX=8TP=-1000' , acks) time.sleep( 5 ) sv.request( '#0EX=0#1EX=0' , acks) else : print ( 'XXXXXXXXX' ) tm.stop() sv.request( '#0EX=0#1EX=0' , acks, True ) ser.close() kb.set_normal_term() del sv, ser, kb os._exit( 0 ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | #!/usr/bin/python3 from pyPMX import * from pyDXL import * import math, kbhit, time, serial, threading # Communication task with PMX def func1(arg): id = 0 # Class instantiation pmx = PMXProtocol(arg[ 0 ], lock = arg[ 1 ]) # Setting initial conditions pmx.MemREAD( id , 400 , 6 ) # Clear error flag pmx.MotorWRITE( id , 2 , ()) # Torque free pmx.MemWRITE8( id , 501 , ( 1 , 0b11111 )) # Control mode (All) pmx.MotorWRITE( id , 1 , ()) # Torque enable while arg[ 2 ]: ang = int (math.sin(time.time()) * 9000.0 ) r = pmx.MotorWRITE( id , 0 , (ang,)) # Angle command if r ! = None : arg[ 3 ] = r[ 1 ][ 0 ] time.sleep( 0.001 ) pmx.MotorWRITE( id , 2 , ()) # Torque free del pmx # Communication task with DXL def func2(arg): id = 1 cnt = 0 led = 0 # Class instantiation dx2 = DXLProtocolV2(arg[ 0 ], lock = arg[ 1 ]) # Setting initial conditions dx2.Write8( id , 64 , 1 ) # Torque enable while arg[ 2 ]: ang = int (math.sin(time.time()) * 1024.0 ) + 2047 dx2.Write32( id , 116 , ang) # Angle command r = dx2.Read32( id , 132 , signed = True ) # Read feedback values if r ! = None : arg[ 4 ] = r cnt + = 1 if cnt % 10 = = 0 : led ^ = 1 dx2.Write8( id , 65 , led) # Update LED status time.sleep( 0.001 ) dx2.Write8( id , 65 , 0 ) # LED off dx2.Write8( id , 64 , 0 ) # Torque free del dx2 kb = kbhit.KBHit() # Open serial port com = serial.Serial( '/dev/ttyAMA0' , 1000000 , timeout = 0.005 ) lock = threading.Lock() act = True v1 = 0 v2 = 0 SharedVar = [com, lock, act, v1, v2] # Start thread th1 = threading.Thread(target = func1, args = [SharedVar]) th2 = threading.Thread(target = func2, args = [SharedVar]) th1.start() th2.start() # Loop until keyboard input while True : if kb.kbhit(): break print (f ' pmx:{SharedVar[3]:6d}, dxl:{SharedVar[4]:6d}' , end = '\r' ) time.sleep( 0.05 ) # End SharedVar[ 2 ] = False th1.join() th2.join() del kb, lock, com, th1, th2 print () |