QueueManager.hpp
Go to the documentation of this file.
1 /*----------------------------------------------------------------------------*/
2 /* */
3 /* Copyright (c) 1995, 2004 IBM Corporation. All rights reserved. */
4 /* Copyright (c) 2005-2009 Rexx Language Association. All rights reserved. */
5 /* */
6 /* This program and the accompanying materials are made available under */
7 /* the terms of the Common Public License v1.0 which accompanies this */
8 /* distribution. A copy is also available at the following address: */
9 /* http://www.ibm.com/developerworks/oss/CPLv1.0.htm */
10 /* */
11 /* Redistribution and use in source and binary forms, with or */
12 /* without modification, are permitted provided that the following */
13 /* conditions are met: */
14 /* */
15 /* Redistributions of source code must retain the above copyright */
16 /* notice, this list of conditions and the following disclaimer. */
17 /* Redistributions in binary form must reproduce the above copyright */
18 /* notice, this list of conditions and the following disclaimer in */
19 /* the documentation and/or other materials provided with the distribution. */
20 /* */
21 /* Neither the name of Rexx Language Association nor the names */
22 /* of its contributors may be used to endorse or promote products */
23 /* derived from this software without specific prior written permission. */
24 /* */
25 /* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS */
26 /* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT */
27 /* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS */
28 /* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT */
29 /* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */
30 /* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED */
31 /* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, */
32 /* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY */
33 /* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */
34 /* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS */
35 /* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
36 /* */
37 /*----------------------------------------------------------------------------*/
38 
39 #ifndef QueueManager_HPP_INCLUDED
40 #define QueueManager_HPP_INCLUDED
41 
42 #include "ServiceMessage.hpp"
43 #include "SysSemaphore.hpp"
44 #include "SysThread.hpp"
45 
46 class APIServer;
47 class ServerQueueManager;
48 
49 class QueueItem
50 {
51  friend class DataQueue;
52 public:
53  QueueItem(const char *data, size_t s)
54  {
55  next = NULL;
56  // we can use the memory item directly
57  elementData = data;
58  size = s;
59  setTime();
60  }
61 
63  {
64  if (elementData != NULL)
65  {
66  // make sure we release this too. This was allocated by the
67  // incoming message, so we need to use the other mechanism for
68  // releasing this memory
70  }
71  }
72 
73  void setTime();
74 
75  // we're passing this data back, so just detach the data buffer.
76  inline void clear()
77  {
78  elementData = NULL;
79  size = 0;
80  }
81 
82 protected:
83 
84  QueueItem *next; // next item in the queue
85  const char *elementData; // the element data
86  size_t size; // size of the element data
87  RexxQueueTime addTime; // time the element was added
88 };
89 
90 class DataQueue
91 {
92  friend class QueueTable;
93 public:
94  DataQueue(const char *ds, int di) : waitSem("DataQueue::waitSem"), dbgds(ds), dbgdi(di)
95  {
96  init(); // do common initilization
97  }
98 
99  DataQueue(SessionID s, const char *ds, int di) : waitSem("DataQueue::waitSem"), dbgds(ds), dbgdi(di)
100  {
101  init(); // do common initilization
102  session = s;
103  }
104 
105  DataQueue(const char *name, const char *ds, int di) : waitSem("DataQueue::waitSem"), dbgds(ds), dbgdi(di)
106  {
107  init(); // do common initilization
108  setName(name);
109  }
110 
111  ~DataQueue();
112 
113  inline void setName(const char *name)
114  {
115  queueName = dupString(name);
116  }
117 
118  void add(ServiceMessage &message);
119  void addLifo(QueueItem *item);
120  void addFifo(QueueItem *item);
121  void clear();
122  QueueItem *getFirst();
123 
124  inline void addWaiter()
125  {
126  waiters++;
127  }
128 
129  inline void removeWaiter()
130  {
131  waiters--;
132  }
133 
134  // check to see if we have processes waiting on the queue, and wake them
135  // up to get an item.
136  inline void checkWaiters()
137  {
138  if (waiters > 0)
139  {
140  waitSem.post();
141  }
142  }
143 
144  inline void waitForData()
145  {
147  }
148 
149  inline bool hasWaiters()
150  {
151  return waiters > 0;
152  }
153 
154  void pull(ServerQueueManager *manager, ServiceMessage &message);
155  bool pullData(ServerQueueManager *manager, ServiceMessage &message);
156 
157  inline void addReference() { references++; }
158  inline size_t removeReference() { return --references; }
159  inline bool hasReferences() { return references != 0; }
160 
161  void init()
162  {
163  next = NULL;
164  itemCount = 0;
165  waiters = 0;
166  references = 1;
167  waitSem.create();
168  firstItem = NULL;
169  lastItem = NULL;
170  queueName = NULL;
171  session = 0;
172  }
173 
174  size_t getItemCount() { return itemCount; }
175 
176 protected:
177 
178  DataQueue *next; // next item in the chain
179  size_t itemCount; // number of items in the queue
180  size_t waiters; // number of processes waiting on a queue item
181  size_t references; // number of nested references to queue
182  SysSemaphore waitSem; // used to signal wait for item
183  QueueItem *firstItem; // first queue item
184  QueueItem *lastItem; // last queue item
185  const char *queueName; // pointer to queue name
186  SessionID session; // session of queue
187  const char *dbgds;
188  int dbgdi;
189 };
190 
191 // a table of queues
193 {
194 public:
195 
197  {
198  queues = NULL;
199  }
200 
201  // locate a named data queue
202  DataQueue *locate(const char *name);
203  // locate a named data queue
204  DataQueue *synchronizedLocate(ServerQueueManager *manager, const char *name);
205  // locate a session data queue
207  // locate a session data queue
209  // locate and remove a named data queue
210  DataQueue *remove(const char *name);
211  // locate a named data queue
213  void remove(DataQueue *q);
214 
215  inline void removeQueue(DataQueue *current, DataQueue *previous)
216  {
217  if (previous != NULL) // if we have a predecessor
218  {
219  // rearrange to get "most recently used" behavior
220  previous->next = current->next;
221  }
222  else
223  {
224  queues = current->next;
225  }
226  }
227 
228  inline bool isEmpty()
229  {
230  return queues == NULL;
231  }
232 
233  // locate a named data queue
234  void add(DataQueue *queue);
235 
236 protected:
237  DataQueue *queues; // head of the data queue chain
238 };
239 
240 // the server instance of the queue manager
242 {
243  friend class DataQueue; // needs access to the instance lock
244  friend class QueueTable; // needs access to the instance lock
245 public:
246  ServerQueueManager() : namedQueues(), sessionQueues(), lock("ServerQueueManager::lock") { lock.create(); }
247 
249  void addToSessionQueue(ServiceMessage &message);
250  void addToNamedQueue(ServiceMessage &message);
251  void pullFromSessionQueue(ServiceMessage &message);
252  void pullFromNamedQueue(ServiceMessage &message);
253  void createSessionQueue(ServiceMessage &message);
255  void createSessionQueue(SessionID session);
256  void createUniqueQueue(ServiceMessage &message);
257  void createNamedQueue(ServiceMessage &message);
258  void openNamedQueue(ServiceMessage &message);
259  void queryNamedQueue(ServiceMessage &message);
260  void nestSessionQueue(ServiceMessage &message);
261  void deleteSessionQueue(ServiceMessage &message);
262  void deleteSessionQueue(DataQueue *queue);
263  void deleteNamedQueue(ServiceMessage &message);
264  void clearSessionQueue(ServiceMessage &message);
265  void clearNamedQueue(ServiceMessage &message);
266  void getSessionQueueCount(ServiceMessage &message);
267  void getNamedQueueCount(ServiceMessage &message);
268  void dispatch(ServiceMessage &message);
269  void cleanupProcessResources(SessionID session);
270  inline bool isStoppable()
271  {
273  }
274 
275 
276 protected:
277  QueueTable namedQueues; // our named queues
278  QueueTable sessionQueues; // the sessions queues
279  SysMutex lock; // our subsystem lock
280 };
281 
282 #endif
char * dupString(const char *oldString)
uintptr_t SessionID
void waitForData()
const char * dbgds
DataQueue * next
void addReference()
DataQueue(const char *ds, int di)
SessionID session
size_t references
QueueItem * lastItem
void addWaiter()
void add(ServiceMessage &message)
QueueItem * firstItem
void addFifo(QueueItem *item)
void pull(ServerQueueManager *manager, ServiceMessage &message)
void clear()
SysSemaphore waitSem
void removeWaiter()
void setName(const char *name)
DataQueue(SessionID s, const char *ds, int di)
size_t itemCount
size_t getItemCount()
void checkWaiters()
size_t removeReference()
bool pullData(ServerQueueManager *manager, ServiceMessage &message)
bool hasWaiters()
bool hasReferences()
const char * queueName
size_t waiters
QueueItem * getFirst()
DataQueue(const char *name, const char *ds, int di)
void addLifo(QueueItem *item)
RexxQueueTime addTime
const char * elementData
size_t size
QueueItem * next
QueueItem(const char *data, size_t s)
void clear()
void setTime()
DataQueue * locate(const char *name)
DataQueue * synchronizedLocate(ServerQueueManager *manager, const char *name)
DataQueue * remove(const char *name)
bool isEmpty()
void add(DataQueue *queue)
DataQueue * queues
void removeQueue(DataQueue *current, DataQueue *previous)
void createNamedQueue(ServiceMessage &message)
void clearSessionQueue(ServiceMessage &message)
void openNamedQueue(ServiceMessage &message)
void addToNamedQueue(ServiceMessage &message)
void pullFromNamedQueue(ServiceMessage &message)
void nestSessionQueue(ServiceMessage &message)
void dispatch(ServiceMessage &message)
void cleanupProcessResources(SessionID session)
void getSessionQueueCount(ServiceMessage &message)
void deleteNamedQueue(ServiceMessage &message)
void createUniqueQueue(ServiceMessage &message)
void addToSessionQueue(ServiceMessage &message)
void deleteSessionQueue(ServiceMessage &message)
DataQueue * getSessionQueue(SessionID session)
void queryNamedQueue(ServiceMessage &message)
QueueTable sessionQueues
void getNamedQueueCount(ServiceMessage &message)
void createSessionQueue(ServiceMessage &message)
void pullFromSessionQueue(ServiceMessage &message)
QueueTable namedQueues
void clearNamedQueue(ServiceMessage &message)
static void releaseResultMemory(void *mem)
void wait(const char *ds, int di)