|
10鱼币
本帖最后由 香喷喷的咸鱼 于 2022-7-3 16:43 编辑
def order_list_timer_cb(event):
process_order_list()
rospy.Timer(rospy.Duration(1), order_list_timer_cb)
代码如上形式
第三行为每秒调用一次order_list_timer_cb这个函数
第一行中order_list_timer_cb的传入参数为event,找不到它的用法,还是说这是一个默认的形式,想问有见过这样用的人没?
以下是代码
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- #import json # Used for reading JSON files (loading jobs to JobQueue)
- #import os # Used to get base filename and file and directory handling
- import rospy
- #import actionlib
- #from JobManager.Tasks import TaskStatus, TaskType, RobotMoveBase, AwaitingLoadCompletion
- from typing import Union, Any
- from JobManager.Job import JobStatus, Job, JobPriority
- from JobManager.Location import Location, make_location_dict
- from JobManager.JobBuilder import job_builder
- from JobManager.JobActivation import job_allocator, job_refiner
- from JobManager.MobileExecutor import MExStatus, MobileExecutor
- from JobManager.Order import *
- from JobManager.JobServiceMethods import call_get_mex_list, call_unassign_job
- from rooster_fleet_manager.srv import PlaceOrder, PlaceOrderResponse, GetPendingJobs, GetPendingJobsResponse, \
- GetActiveJobs, GetActiveJobsResponse, GetJobInfo, GetJobInfoResponse, \
- GetMexList, GetMexListResponse, GetMexListRequest, AssignJobToMex, AssignJobToMexRequest
- from rooster_fleet_manager.msg import PendingJob, ActiveJob, JobInfo, TaskInfo
- NODE_NAME = "[job_manager] "
- #region Service callback definitions
- # - PlaceOrder service callback -
- def order_service_cb(request):
- """
- PlaceOrder service callback function.
- Takes in a PlaceOrderRequest request with a order keyword, priority and arguments.
- Processes these and adds the processed order in list form to the order_list.
- Returns a PlaceOrderResponse response with information on if the call failed and why.
- """
- print(NODE_NAME + "Order service has been called with: " + str(request))
- # print(NODE_NAME + "Arguments: ", request.order_args, "List size: ", len(request.order_args) )
- order_response = PlaceOrderResponse()
- # NOTE RUDIMENTARY ORDER PROCESSING, RIGID IN NATURE.
- if request.keyword == OrderKeyword.TRANSPORT.name: # Keyword: TRANSPORT.
- # Expecting: transport, priority, [from_location, to_location]
- if len(request.order_args) == OrderTypeArgCount.TRANSPORT.value:
- priority = JobPriority[request.priority].value # Check if it's >= 1 and <= 4.
- from_loc = request.order_args[0] # Check if it's in the known locations dictionary
- to_loc = request.order_args[1] # Check if it's in the known locations dictionary
- if (priority >= 1 and priority <= 4) and (from_loc in location_dict) and (to_loc in location_dict):
- # Succesful check on keyword and arguments, add order to order_list.
- order_response.error_status = OrderResponseStatus.SUCCES.name
- order_response.error_report = ""
- order_list.append([request.keyword, JobPriority(priority), from_loc, to_loc])
- else:
- # Error occured, set status to ERROR and supply erorr report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = \
- "[TRANSPORT] Invalid priority (" + str(not (priority >= 1 and priority <= 4)) + \
- ") or location argument (" + \
- str(not ((from_loc in location_dict) and (to_loc in location_dict))) + \
- ")."
- else:
- # Error occured, set status to ERROR and supply erorr report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "[TRANSPORT] Invalid number of arguments, expected " + \
- str(OrderTypeArgCount.TRANSPORT.value) + \
- ", received " + str(len(request.order_args))
-
- elif request.keyword == OrderKeyword.MOVE.name: # Keyword: MOVE.
- # Expecting: move, priority, [to_location]
- if len(request.order_args) == OrderTypeArgCount.MOVE.value:
- priority = JobPriority[request.priority].value # Check if it's >= 1 and <= 4.
- to_loc = request.order_args[0] # Check if it's in the known locations dictionary
- if (priority >= 1 and priority <= 4) and (to_loc in location_dict):
- # Succesful check on keyword and arguments, add order to order_list.
- order_response.error_status = OrderResponseStatus.SUCCES.name
- order_response.error_report = ""
- order_list.append([request.keyword, JobPriority(priority), to_loc])
- else:
- # Error occured, set status to ERROR and supply error report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "[MOVE] Invalid priority (" + \
- str(not (priority >= 1 and priority <= 4)) + \
- ") or location argument (" + \
- str(not (to_loc in location_dict) ) + ")."
- else:
- # Error occured, set status to ERROR and supply erorr report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "[MOVE] Invalid number of arguments, expected " + \
- str(OrderTypeArgCount.MOVE.value) + \
- ", received " + str(len(request.order_args))
- elif request.keyword == OrderKeyword.LOAD.name: # Keyword: LOAD
- # Expecting: load, priority, []
- if len(request.order_args) == OrderTypeArgCount.LOAD.value:
- priority = JobPriority[request.priority].value # Check if it's >= 1 and <= 4.
- if (priority >= 1 and priority <= 4):
- # Succesful check on keyword and arguments, add order to order_list.
- order_response.error_status = OrderResponseStatus.SUCCES.name
- order_response.error_report = ""
- order_list.append([request.keyword, JobPriority(priority)])
- else:
- # Error occured, set status to ERROR and supply error report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "[LOAD] Invalid priority."
- else:
- # Error occured, set status to ERROR and supply error report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "[LOAD] Invalid number of arguments, expected " + \
- str(OrderTypeArgCount.LOAD.value) + \
- ", received " + str(len(request.order_args))
- elif request.keyword == OrderKeyword.UNLOAD.name: # Keyword: UNLOAD
- # Expecting: load, priority, []
- if len(request.order_args) == OrderTypeArgCount.UNLOAD.value:
- priority = JobPriority[request.priority].value # Check if it's >= 1 and <= 4.
- if (priority >= 1 and priority <= 4):
- # Succesful check on keyword and arguments, add order to order_list.
- order_response.error_status = OrderResponseStatus.SUCCES.name
- order_response.error_report = ""
- order_list.append([request.keyword, JobPriority(priority)])
- else:
- # Error occured, set status to ERROR and supply error report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "[LOAD] Invalid priority."
- else:
- # Error occured, set status to ERROR and supply error report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "[LOAD] Invalid number of arguments, expected " + \
- str(OrderTypeArgCount.UNLOAD.value) + \
- ", received " + str(len(request.order_args))
- else:
- # Error occured, set status to ERROR and supply erorr report.
- order_response.error_status = OrderResponseStatus.ERROR.name
- order_response.error_report = "Invalid keyword." # Could not interpret order keyword.
- # print(NODE_NAME + "Order service is done, current order_list: " + str(order_list))
- return order_response # the service Response class, in this case PlaceOrderResponse
- # - GetPendingJobs service callback -
- def get_pending_jobs_service_cb(request):
- """
- GetPendingJobs service callback function.
- Takes in a empty request.
- Returns a response with a list of all Pending Jobs and some basic information.
- """
- # print(NODE_NAME + "A service request for the Pending Jobs List has been received.")
- pending_jobs_response = GetPendingJobsResponse()
- pending_jobs_response.jobs_count = len(pending_job_list)
- for job in pending_job_list:
- pending_job = PendingJob()
- pending_job.priority = job.priority.name
- pending_job.job_id = job.id
- pending_job.task_count = job.task_count
- pending_job.keyword = job.keyword
- pending_jobs_response.jobs.append(pending_job)
- return pending_jobs_response
- # - GetActiveJobs service callback -
- def get_active_jobs_service_cb(request):
- """
- GetActiveJobs service callback function.
- Takes in a empty request.
- Returns a response with a list of all Active Jobs and all basic information.
- """
- # print(NODE_NAME + "A service request for the Active Jobs List has been received.")
- active_jobs_response = GetActiveJobsResponse() #数据类型,被激活的任务数量
- active_jobs_response.jobs_count = len(active_job_list)
- for job in active_job_list: #多个任务的详细信息被存储
- active_job = ActiveJob()
- active_job.job_id = job.id
- active_job.status = job.status.name
- active_job.priority = job.priority.name
- active_job.mex_id = job.mex_id
- active_job.task_count = job.task_count
- active_job.current_task = job.task_current
- active_job.keyword = job.keyword
- active_jobs_response.jobs.append(active_job)
- return active_jobs_response
- # - GetJobInfo service callback -
- def get_job_info_service_cb(request):
- """
- GetJobInfo service callback function.
- Takes in a request with a Job ID.
- Returns a response with all information on the Job
- matching the call ID, including a list of the Job's Tasks.
- """
- job_id_to_find = request.job_id
- print(NODE_NAME + "A service request for the Get Job Info has been received for " + job_id_to_find + ".")
- job_info_response = GetJobInfoResponse()
- found_job = None
- for job in pending_job_list:
- if job.id == job_id_to_find:
- found_job = job
- break
- else:
- for job in active_job_list:
- if job.id == job_id_to_find:
- found_job = job
- break
- else:
- job_info_response.error_status = "ERROR" # TODO Replace with error messages enum.
- job_info_response.error_report = "Could not find " + job_id_to_find + " in pending or active job lists."
-
- if found_job != None:
- job_information = JobInfo()
- job_information.job_id = found_job.id
- job_information.status = found_job.status.name
- job_information.priority = found_job.priority.name
- job_information.mex_id = found_job.mex_id if not found_job.mex_id == None else ""
- job_information.task_count = found_job.task_count
- job_information.current_task = found_job.task_current if not found_job.task_current == None else 0
- job_information.keyword = found_job.keyword
- for task in found_job.task_list:
- task_info = TaskInfo()
- task_info.status = task.status.name
- task_info.type = task.type.name
- job_information.task_list.append(task_info)
- job_info_response.job_info = job_information
- job_info_response.error_status = "SUCCES"
- return job_info_response
- #endregion
- #region Timer callback definitons
- # - Order list timer callback -
- def order_list_timer_cb(event):
- """ Order list timer callback function. Calls process_order_list function. """
- print(NODE_NAME + "Order list timer callback, processing order_list and building rough jobs.")
- process_order_list()
- # - Job allocator timer callback -
- def job_allocator_timer_cb(event):
- """ Job allocator timer callback function. Calls job allocator function. """
- #删除choose后,根据job类型选择不同种类mex
- #if job_allocator(pending_jobs_list=pending_job_list, active_jobs_list=active_job_list, mexs_list=mex_list,choose="rdg") != 0:
- #1.有job被分配:job_allocator = 0
- #2.无job被分配:job_allocator !=0
- #无论有无job被分配,都执行该函数即可
- if job_allocator(pending_jobs_list=pending_job_list, active_jobs_list=active_job_list, mexs_list=mex_list) != 0:
- # rospy.loginfo("Failed to allocate job.")
- pass
- #endregion
- #region Job completion callback definition
- def job_completion_cb(job_id, mex_id):
- """
- Job completion callback function, gets called when a
- Jobs is finished (succesful, cancel, error or abort).
- It is attached to a Job in the job_builder function.
- """
- print(NODE_NAME + "Job called the job_completion_cb function: " + str(job_id) + ", " + str(mex_id))
- # First send update to MEx Sentinel to unassign job.
- call_unassign_job(mex_id=mex_id)
- # Then call MEx Sentinel to provide latest MEx List.
- mex_list = call_get_mex_list()
- # Remove completed Job from the active_job_list.
- index_to_pop = None
- for index, job in enumerate(active_job_list):
- if job.id == job_id:
- index_to_pop = index
- break
- else:
- # Couldn't find job_id in active_job_list... Handle this?
- pass
- if not index_to_pop == None:
- active_job_list.pop(index_to_pop)
- #endregion
- def process_order_list():
- """
- Build jobs from all orders in the order_list by calling
- the job_builder function, then clear the order_list.
- """
- global job_index #设置全局变量,job的索引
- for order in order_list: #每一个order,即添加一个job
- job_index = job_builder(pending_jobs_list=pending_job_list, order=order, job_index=job_index, location_dict=location_dict, completion_cb=job_completion_cb) # type: Union[int, Any]
- del order_list[:]
- if __name__ == '__main__':
- try:
- # Retrieve robots and set up a list of available MobileExecutor (MEx) instances
- #加载了所有mex,存至mex_list
- mex_list = call_get_mex_list()
- # for robot in robot_namespaces:
- # mex_list.append(MobileExecutor(robot))
- for mex in mex_list:
- print(NODE_NAME + str( (mex.id, mex.status, mex.job_id) ) )
-
- # Lists for Orders, PendingJobs, ActiveJobs:
- order_list = []
- pending_job_list = []
- active_job_list = []
- job_index = 1
- # Testing the adding of multiple Location class instances, storing them in a dictionary.
- # TODO Replace this with a dynamic(?) dictionary which is constructed from the MEx Sentinel.
- location_dict = make_location_dict()
-
- # Initialize the node.
- rospy.init_node('job_manager')
- ####################以上皆是初始化#########################
- # Initialize services.
- order_service = rospy.Service('~place_order', PlaceOrder , order_service_cb) #服务名:/job_manager/place_order
- get_pending_jobs_service = rospy.Service('~get_pending_jobs', GetPendingJobs, get_pending_jobs_service_cb)
- get_active_jobs_service = rospy.Service('~get_active_jobs', GetActiveJobs, get_active_jobs_service_cb)
- get_job_info_service = rospy.Service('~get_job_info', GetJobInfo, get_job_info_service_cb)
- # Initialize timers for the JobBuilder, and JobAllocator.
- rospy.Timer(rospy.Duration(1), order_list_timer_cb) # Every second check order list to try build rough jobs.
- rospy.Timer(rospy.Duration(5), job_allocator_timer_cb) # Call the job_allocator in 5 seconds to try allocating jobs.
- # The above job_allocator timer should be replaced with something more elegant...
- # Keep node running.
- rospy.spin()
- except rospy.ROSInterruptException:
- pass
复制代码 |
|