Source code for morse.middleware.pocolibs.overlays.rflex_overlay
import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs.actuators.genpos import GenPosPoster
from morse.middleware.pocolibs_datastream import PosterNotFound, DummyPoster
[docs]class RflexModule(MorseOverlay):
def __init__(self, overlaid_object):
# Call the constructor of the parent class
MorseOverlay.__init__(self, overlaid_object)
self._clean_track = False
self._cntrl = DummyPoster("rflexCntrl")
[docs] def interrupt(self):
self.overlaid_object.stop()
if self._clean_track:
self.overlaid_object.input_functions.pop()
MorseOverlay.interrupt(self)
@service
[docs] def InitClient(self, *args):
pass
@service
[docs] def EndClient(self, *args):
pass
@service
[docs] def SetWdogRef(self, *args):
pass
@service
[docs] def GetWdogRef(self, *args):
pass
@service
[docs] def SetMode(self, mode):
self._mode = mode
@service
[docs] def GetMode(self):
return status.SUCCESS, self._mode
@service
[docs] def PomTagging(self, *args):
pass
@service
[docs] def SetPos(self, *args):
pass
@service
[docs] def SetPosFromMEPoster(self, *args):
pass
@service
[docs] def SetVarparams(self, *args):
pass
@async_service
[docs] def Stop(self, *args):
self.overlaid_object.stop()
del self.overlaid_object.input_functions[:]
self.completed(status.SUCCESS)
@async_service
[docs] def TrackEnd(self, *args):
self.overlaid_object.stop()
self.completed(status.SUCCESS)
@async_service
[docs] def GotoSpeed(self, numRef, updatedPeriod, v, vt, w, *args):
self.overlaid_object.set_speed(float(v), float(w))
self.completed(status.SUCCESS)
@interruptible
@async_service
[docs] def TrackSpeedStart(self, poster_name):
try:
poster = GenPosPoster(self.overlaid_object,
{ 'poster' : poster_name, 'delay' : False })
except PosterNotFound:
return self.completed(status.FAILED, ["POSTER_NOT_FOUND"])
self._clean_track = True
self.overlaid_object.input_functions.append(poster.default)
@service
[docs] def GetGeoConfig(self, *args):
pass
@service
[docs] def SetGeoConfig(self, *args):
pass
@service
[docs] def SonarOn(self, *args):
pass
@service
[docs] def SonarOff(self, *args):
pass
@service
[docs] def BrakeOn(self, *args):
pass
@service
[docs] def BrakeOff(self, *args):
pass
@service
[docs] def GetJoystick(self, *args):
pass
@service
[docs] def MonitorBattery(self, *args):
pass
@service
[docs] def Gyro(self, *args):
pass
@service
[docs] def GetGyro(self, *args):
pass
@service
[docs] def SetOdometryMethod(self, *args):
pass
@service
[docs] def Log(self, *args):
pass
@service
[docs] def StopLog(self, *args):
pass
[docs] def name(self):
return "rflex"