Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2016 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef _NODE_HH_
19 #define _NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/bind.hpp>
23 #include <boost/enable_shared_from_this.hpp>
24 #include <map>
25 #include <list>
26 #include <string>
27 #include <vector>
28 
31 #include "gazebo/util/system.hh"
32 
33 namespace gazebo
34 {
35  namespace transport
36  {
39  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
40  {
44  public: PublishTask(transport::PublisherPtr _pub,
45  const google::protobuf::Message &_message)
46  : pub(_pub)
47  {
48  this->msg = _message.New();
49  this->msg->CopyFrom(_message);
50  }
51 
54  public: tbb::task *execute()
55  {
56  this->pub->WaitForConnection();
57  this->pub->Publish(*this->msg, true);
58  this->pub->SendMessage();
59  delete this->msg;
60  this->pub.reset();
61  return NULL;
62  }
63 
65  private: transport::PublisherPtr pub;
66 
68  private: google::protobuf::Message *msg;
69  };
71 
74 
78  class GZ_TRANSPORT_VISIBLE Node :
79  public boost::enable_shared_from_this<Node>
80  {
82  public: Node();
83 
85  public: virtual ~Node();
86 
91  public: void Init(const std::string &_space ="");
92 
94  public: void Fini();
95 
98  public: std::string GetTopicNamespace() const;
99 
103  public: std::string DecodeTopicName(const std::string &_topic);
104 
108  public: std::string EncodeTopicName(const std::string &_topic);
109 
112  public: unsigned int GetId() const;
113 
116  public: void ProcessPublishers();
117 
119  public: void ProcessIncoming();
120 
124  public: bool HasLatchedSubscriber(const std::string &_topic) const;
125 
126 
133  public: template<typename M>
134  void Publish(const std::string &_topic,
135  const google::protobuf::Message &_message)
136  {
137  transport::PublisherPtr pub = this->Advertise<M>(_topic);
138  PublishTask *task = new(tbb::task::allocate_root())
139  PublishTask(pub, _message);
140 
141  tbb::task::enqueue(*task);
142  return;
143  }
144 
152  public: template<typename M>
153  transport::PublisherPtr Advertise(const std::string &_topic,
154  unsigned int _queueLimit = 1000,
155  double _hzRate = 0)
156  {
157  std::string decodedTopic = this->DecodeTopicName(_topic);
158  PublisherPtr publisher =
159  transport::TopicManager::Instance()->Advertise<M>(
160  decodedTopic, _queueLimit, _hzRate);
161 
162  boost::mutex::scoped_lock lock(this->publisherMutex);
163  publisher->SetNode(shared_from_this());
164  this->publishers.push_back(publisher);
165 
166  return publisher;
167  }
168 
176  public: template<typename M, typename T>
177  SubscriberPtr Subscribe(const std::string &_topic,
178  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
179  bool _latching = false)
180  {
181  SubscribeOptions ops;
182  std::string decodedTopic = this->DecodeTopicName(_topic);
183  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
184 
185  {
186  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
187  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
188  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
189  }
190 
191  SubscriberPtr result =
192  transport::TopicManager::Instance()->Subscribe(ops);
193 
194  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
195 
196  return result;
197  }
198 
205  public: template<typename M>
206  SubscriberPtr Subscribe(const std::string &_topic,
207  void(*_fp)(const boost::shared_ptr<M const> &),
208  bool _latching = false)
209  {
210  SubscribeOptions ops;
211  std::string decodedTopic = this->DecodeTopicName(_topic);
212  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
213 
214  {
215  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
216  this->callbacks[decodedTopic].push_back(
217  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
218  }
219 
220  SubscriberPtr result =
221  transport::TopicManager::Instance()->Subscribe(ops);
222 
223  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
224 
225  return result;
226  }
227 
235  template<typename T>
236  SubscriberPtr Subscribe(const std::string &_topic,
237  void(T::*_fp)(const std::string &), T *_obj,
238  bool _latching = false)
239  {
240  SubscribeOptions ops;
241  std::string decodedTopic = this->DecodeTopicName(_topic);
242  ops.Init(decodedTopic, shared_from_this(), _latching);
243 
244  {
245  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
246  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
247  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
248  }
249 
250  SubscriberPtr result =
251  transport::TopicManager::Instance()->Subscribe(ops);
252 
253  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
254 
255  return result;
256  }
257 
258 
265  SubscriberPtr Subscribe(const std::string &_topic,
266  void(*_fp)(const std::string &), bool _latching = false)
267  {
268  SubscribeOptions ops;
269  std::string decodedTopic = this->DecodeTopicName(_topic);
270  ops.Init(decodedTopic, shared_from_this(), _latching);
271 
272  {
273  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
274  this->callbacks[decodedTopic].push_back(
276  }
277 
278  SubscriberPtr result =
279  transport::TopicManager::Instance()->Subscribe(ops);
280 
281  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
282 
283  return result;
284  }
285 
290  public: bool HandleData(const std::string &_topic,
291  const std::string &_msg);
292 
297  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
298 
305  public: void InsertLatchedMsg(const std::string &_topic,
306  const std::string &_msg);
307 
314  public: void InsertLatchedMsg(const std::string &_topic,
315  MessagePtr _msg);
316 
320  public: std::string GetMsgType(const std::string &_topic) const;
321 
327  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
328 
329  private: std::string topicNamespace;
330  private: std::vector<PublisherPtr> publishers;
331  private: std::vector<PublisherPtr>::iterator publishersIter;
332  private: static unsigned int idCounter;
333  private: unsigned int id;
334 
335  private: typedef std::list<CallbackHelperPtr> Callback_L;
336  private: typedef std::map<std::string, Callback_L> Callback_M;
337  private: Callback_M callbacks;
338  private: std::map<std::string, std::list<std::string> > incomingMsgs;
339 
341  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
342 
343  private: boost::mutex publisherMutex;
344  private: boost::mutex publisherDeleteMutex;
345  private: boost::recursive_mutex incomingMutex;
346 
349  private: boost::recursive_mutex processIncomingMutex;
350 
351  private: bool initialized;
352  };
354  }
355 }
356 #endif
Options for a subscription.
Definition: SubscribeOptions.hh:35
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Adverise a topic.
Definition: Node.hh:153
Forward declarations for the common classes.
Definition: Animation.hh:33
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:236
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
Forward declarations for transport.
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:177
Callback helper Template.
Definition: CallbackHelper.hh:111
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:206
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
#define NULL
Definition: CommonTypes.hh:31
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:265
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:134
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36