QueueManager.cpp
Go to the documentation of this file.
1 /*----------------------------------------------------------------------------*/
2 /* */
3 /* Copyright (c) 1995, 2004 IBM Corporation. All rights reserved. */
4 /* Copyright (c) 2005-2009 Rexx Language Association. All rights reserved. */
5 /* */
6 /* This program and the accompanying materials are made available under */
7 /* the terms of the Common Public License v1.0 which accompanies this */
8 /* distribution. A copy is also available at the following address: */
9 /* http://www.ibm.com/developerworks/oss/CPLv1.0.htm */
10 /* */
11 /* Redistribution and use in source and binary forms, with or */
12 /* without modification, are permitted provided that the following */
13 /* conditions are met: */
14 /* */
15 /* Redistributions of source code must retain the above copyright */
16 /* notice, this list of conditions and the following disclaimer. */
17 /* Redistributions in binary form must reproduce the above copyright */
18 /* notice, this list of conditions and the following disclaimer in */
19 /* the documentation and/or other materials provided with the distribution. */
20 /* */
21 /* Neither the name of Rexx Language Association nor the names */
22 /* of its contributors may be used to endorse or promote products */
23 /* derived from this software without specific prior written permission. */
24 /* */
25 /* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS */
26 /* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT */
27 /* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS */
28 /* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT */
29 /* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */
30 /* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED */
31 /* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, */
32 /* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY */
33 /* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */
34 /* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS */
35 /* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
36 /* */
37 /*----------------------------------------------------------------------------*/
38 
39 #include "QueueManager.hpp"
40 #include "APIServer.hpp"
41 #include <time.h>
42 #include <new>
43 #include "stdio.h"
44 #include "Utilities.hpp"
45 #include "SynchronizedBlock.hpp"
46 
47 extern int rxapiCounter;
48 
49 /**
50  * Set the update time for a macro item.
51  */
53 {
54  time_t timer = time(NULL);
55  struct tm *time = localtime(&timer);
56  addTime.year = time->tm_year;
57  addTime.month = time->tm_mon;
58  addTime.day = time->tm_mday;
59  addTime.hours = time->tm_hour;
60  addTime.minutes = time->tm_min;
61  addTime.seconds = time->tm_sec;
63  addTime.weekday = time->tm_wday;
64 }
65 
66 /**
67  * Perform cleanup for a delete data queue.
68  */
70 {
71  waitSem.close(); // make sure our semaphore cleans up if we've used it.
72  clear();
73  if (queueName != NULL)
74  {
75  delete [] queueName; // also delete the name
76  }
77 }
78 
79 /**
80  * Clear all entries from the data queue.
81  */
83 {
84  // now clear the queue
85  QueueItem *item = firstItem;
86  while (item != NULL)
87  {
88  // move to the next one and delete
89  QueueItem *localnext = item->next;
90  delete item;
91  item = localnext;
92  }
93  firstItem = NULL;
94  lastItem = NULL;
95  itemCount = 0;
96 }
97 
98 /**
99  * Process a queue add operation.
100  *
101  * @param message The service message for the add operation.
102  */
104 {
105  const char *queueData = (const char *)message.getMessageData();
106  size_t itemLength = (size_t)message.parameter1;
107  size_t order = (size_t)message.parameter2;
108  // detach the message data from the message so the controller
109  // doesn't free this.
110  message.clearMessageData();
111  QueueItem *item = new QueueItem(queueData, itemLength);
112  if (order == QUEUE_LIFO)
113  {
114  addLifo(item);
115  }
116  else
117  {
118  addFifo(item);
119  }
120  message.setResult(QUEUE_ITEM_ADDED);
121 }
122 
123 
124 /**
125  * Add an item to a queue in LIFO order.
126  *
127  * @param item The item to add.
128  */
130 {
131  item->next = firstItem;
132  firstItem = item;
133  if (lastItem == NULL)
134  {
135  lastItem = item;
136  }
137  itemCount++;
138  // make sure we notify any waiters that something has arrived.
139  checkWaiters();
140 }
141 
142 /**
143  * Add an item to the queue in FIFO order.
144  *
145  * @param item The item to add.
146  */
148 {
149  if (lastItem == NULL)
150  {
151  firstItem = item;
152  lastItem = item;
153  }
154  else
155  {
156  lastItem->next = item;
157  lastItem = item;
158  }
159  itemCount++;
160  // make sure we notify any waiters that something has arrived.
161  checkWaiters();
162 }
163 
164 
165 /**
166  * Pull the first item off the queue.
167  *
168  * @return The QueueItem at the head of the queue, or NULL if the
169  * queue is empty.
170  */
172 {
173  QueueItem *item = firstItem;
174  if (item != NULL)
175  {
176  firstItem = item->next;
177  if (firstItem == NULL)
178  {
179  lastItem = NULL;
180  }
181  itemCount--;
182  }
183  return item;
184 }
185 
186 
187 /**
188  * Attempt to pull data from a data queue and attach it to a
189  * return message.
190  *
191  * @param message The message being processed.
192  *
193  * @return true if the queue had a data item, false if it was currently
194  * empty.
195  */
197 {
198  Lock managerLock(manager->lock, "DataQueue::pullData", 0); // this needs synchronization here
199 
200  // now that we have the lock, clear the wait sem unconditionally.
201  // this should be safe, as it is either already clear, and has waiters camped
202  // on it, or it has recently been posted, and since we're going to read data,
203  // we either want it cleared for others to wait, or we're going to need to wait
204  // on it ourself.
205  waitSem.reset();
206  QueueItem *item = getFirst();
207  // if we have an item, return it.
208  if (item != NULL)
209  {
210  // make sure we pass the total length back
211  message.parameter1 = item->size;
212  // copy the time stamp into the now-unused name buffer
213  memcpy(message.nameArg, &item->addTime, sizeof(RexxQueueTime));
214  // the message will delete the queue data once it has been sent
215  // back to the client.
216  message.setMessageData((void *)item->elementData, item->size);
217  // this data needs to be freed once the result is sent back, if we allocated it
218  message.retainMessageData = false;
219  // we've taken the data from the item, so clear it out before we delete.
220  item->clear();
221  // we're done with this, let it go.
222  delete item;
223  message.setResult(QUEUE_ITEM_PULLED);
224  return true;
225  }
226  // Move along, nothing to see here....
227  return false;
228 }
229 
230 
231 /**
232  * Pull an item from the front of the queue.
233  *
234  * @param message The message from the client.
235  */
237 {
238  // this might take multiple times if we have to wait
239  size_t noWait = (size_t)message.parameter1;
240 
241  // if the pull succeeded, return now.
242  if (pullData(manager, message))
243  {
244  return;
245  }
246 
247  // nowait value
248  if (noWait != QUEUE_WAIT_FOR_DATA)
249  {
250  message.setResult(QUEUE_EMPTY); // nada
251  return;
252  }
253  else
254  {
255  {
256  Lock managerLock(manager->lock, "DataQueue::pull#1", 0);
257  // indicate we have another waiting queue
258  addWaiter();
259  }
260  // now keep looping until we actually get an item
261  while (true)
262  {
263  waitForData();
264  {
265  // see if this is doable now without waiting...there was a window of
266  // opportunity for an item to be added.
267  if (pullData(manager, message))
268  {
269  {
270  Lock managerLock(manager->lock, "DataQueue::pull#2", 0);
271  // remove us as a waiter
272  removeWaiter();
273  }
274  return;
275  }
276  }
277  }
278  }
279 }
280 
281 /**
282  * locate a named data queue
283  *
284  * @param name The target data queue name.
285  *
286  * @return The DataQueue descriptor, or NULL if it does not
287  * exist.
288  */
289 DataQueue *QueueTable::locate(const char *name)
290 {
291  DataQueue *current = queues; // start the search
292  DataQueue *previous = NULL; // no previous one
293 
294  /* while more queues */
295  while (current != NULL)
296  {
297  // find the one we want?
298  if (Utilities::strCaselessCompare(name, current->queueName) == 0)
299  {
300  return current;
301  }
302  previous = current; /* remember this block */
303  current = current->next; /* step to the next block */
304  }
305  return NULL;
306 }
307 
308 /**
309  * locate a named data queue, with session manager locking
310  *
311  * @param name The target data queue name.
312  *
313  * @return The DataQueue descriptor, or NULL if it does not
314  * exist.
315  */
317 {
318  Lock managerLock(manager->lock, "QueueTable::synchronizedLocate", 0); // this needs synchronization here
319  return locate(name);
320 }
321 
322 /**
323  * locate a session data queue
324  *
325  * @param id The session ID of the queue.
326  *
327  * @return The DataQueue for the session, which will be created
328  * if needed.
329  */
331 {
332  DataQueue *current = queues; // start the search
333  DataQueue *previous = NULL; // no previous one
334 
335  while (current != NULL) // while more queues
336  {
337  // find the one we want?
338  if (current->session == id)
339  {
340  return current;
341  }
342  previous = current; // remember this block
343  current = current->next; // to the next block
344  }
345  return NULL; // return NULL if not located
346 }
347 
348 /**
349  * locate a session data queue, with session manager locking
350  *
351  * @param id The session ID of the queue.
352  *
353  * @return The DataQueue for the session, which will be created
354  * if needed.
355  */
357 {
358  Lock managerLock(manager->lock, "QueueTable::synchronizedLocate", 0); // this needs synchronization here
359  return locate(id);
360 }
361 
362 
363 /**
364  * locate and remove a named data queue
365  *
366  * @param name The queue name.
367  *
368  * @return The DataQueue matching the name, or NULL if it doesn't exist.
369  */
370 DataQueue *QueueTable::remove(const char *name)
371 {
372  DataQueue *current = queues; // start the search
373  DataQueue *previous = NULL; // no previous one
374 
375  while (current != NULL) /* while more queues */
376  {
377  // find the one we want?
378  if (Utilities::strCaselessCompare(name, current->queueName) == 0)
379  {
380  // move this to the front so we find it quickly
381  removeQueue(current, previous);
382  return current;
383  }
384  previous = current; /* remember this block */
385  current = current->next; /* step to the next block */
386  }
387  return NULL;
388 }
389 
390 
391 /**
392  * locate and remove a specific data queue
393  *
394  * @param q The queue to remove.
395  */
397 {
398  DataQueue *current = queues; // start the search
399  DataQueue *previous = NULL; // no previous one
400 
401  while (current != NULL) /* while more queues */
402  {
403  // find the one we want?
404  if (current == q)
405  {
406  // move this to the front so we find it quickly
407  removeQueue(current, previous);
408  }
409  previous = current; /* remember this block */
410  current = current->next; /* step to the next block */
411  }
412 }
413 
414 /**
415  * locate a session data queue
416  *
417  * @param id The session identifier of the queue to remove.
418  *
419  * @return The removed queue.
420  */
422 {
423  DataQueue *current = queues; // start the search
424  DataQueue *previous = NULL; // no previous one
425 
426  while (current != NULL) // while more queues
427  {
428  // find the one we want?
429  if (current->session == id)
430  {
431  // move this to the front so we find it quickly
432  removeQueue(current, previous);
433  return current;
434  }
435  previous = current; // remember this block
436  current = current->next; // to the next block
437  }
438  current = new DataQueue(id, "QueueTable::remove", 0); // create a new session queue
439  add(current); // and add it to the table.
440  return current;
441 }
442 
443 
444 /**
445  * add a named data queue to our list.
446  *
447  * @param queue The new queue to add.
448  */
450 {
451  queue->next = queues;
452  queues = queue;
453 }
454 
455 
456 // Add an item to the session queue. The message arguments have the
457 // following meanings:
458 //
459 // parameter1 -- length of the queue item.
460 // parameter2 -- lifo/fifo flag
461 // parameter3 -- handle of the session queue
463 {
464  // We can go directly to the referenced queue.
465  DataQueue *queue = getSessionQueue((SessionID)message.parameter3);
466  queue->add(message);
467 }
468 
469 // Add an item to a named queue. The message arguments have the
470 // following meanings:
471 //
472 // parameter1 -- length of the queue item.
473 // parameter2 -- lifo/fifo flag
474 // nameArg -- ASCII-Z name of the queue
476 {
477  DataQueue *queue = namedQueues.locate(message.nameArg);
478  // not previously created?
479  if (queue == NULL)
480  {
481  // this is an error
483  }
484  // queue exists, so add the item
485  else
486  {
487  queue->add(message);
488  }
489 }
490 
491 
492 // Pull an item from a session queue. The message arguments have the
493 // following meanings:
494 //
495 // parameter1 -- NOWAIT flag, indicating whether we should wait for data
496 // parameter2 -- the ENDWAIT flat to indicate this was a waiting process
497 // parameter3 -- session queue handle
499 {
500  DataQueue *queue = getSessionQueue((SessionID)message.parameter3);
501  queue->pull(this, message);
502 }
503 
504 
505 // Pull an item from a session queue. The message arguments have the
506 // following meanings:
507 //
508 // parameter1 -- NOWAIT flag, indicating whether we should wait for data
509 // parameter2 -- the ENDWAIT flat to indicate this was a waiting process
510 // nameArg -- ASCII-Z name of the queue
512 {
513  // we're holding the lock yet, so we need to use the locate
514  // method that grabs the lock first. If we don't, then we run
515  // the risk that the queue will be reordered while we're searching.
516  // The results will be bad, definitely very bad.
517  DataQueue *queue = namedQueues.synchronizedLocate(this, message.nameArg);
518  // not previously created?
519  if (queue == NULL)
520  {
521  // this is an error
523  }
524  // queue exists, so add the item
525  else
526  {
527  queue->pull(this, message);
528  }
529 }
530 
531 // locate a session queue from session id. This will create it, if necessary
532 //
533 // parameter1 -- caller's session id (replaced by queue handle on return);
535 {
536  // this could be redundant, but if called as a result of a PULL operation,
537  // we're not holding the lock yet. We need to nest the call.
538  Lock managerLock(lock, "ServerQueueManager::getSessionQueue", 0);
539 
540  DataQueue *queue = sessionQueues.locate(session);
541  // not previously created?
542  if (queue == NULL)
543  {
544  // this is easy, just create a new queue and add it to the table
545  queue = new DataQueue(session, "ServerQueueManager::getSessionQueue", 0);
546  sessionQueues.add(queue);
547  }
548  return queue;
549 }
550 
551 // Create a session queue. The message arguments have the
552 // following meanings:
553 //
554 // parameter1 -- caller's session id (replaced by queue handle on return);
556 {
557  DataQueue *queue = sessionQueues.locate(session);
558  // not previously created?
559  if (queue == NULL)
560  {
561  // this is easy, just create a new queue and add it to the table
562  queue = new DataQueue(session, "ServerQueueManager::createSessionQueue", 0);
563  sessionQueues.add(queue);
564  }
565  // name collision...we need to update
566  else
567  {
568  // we have nested usage of the session queues,
569  // so we need to bump the nesting counter.
570  queue->addReference();
571  }
572 }
573 
574 // Create a session queue. The message arguments have the
575 // following meanings:
576 //
577 // parameter1 -- caller's session id (replaced by queue handle on return);
579 {
580  SessionID session = (SessionID)message.parameter1;
581  createSessionQueue(session);
582  // the session id is used as the handle
583  message.parameter1 = (uintptr_t)session;
584  message.setResult(QUEUE_CREATED);
585 }
586 
587 /**
588  * Create a queue with a unique name.
589  *
590  * @param message The inbound service message.
591  */
593 {
594  DataQueue *queue = new DataQueue("ServerQueueManager::createUniqueQueue", 0); // get an anonymous queue
595  // the queue pointer makes a good starting point for an anonymous tag
596  uintptr_t tag = (uintptr_t)queue;
597  for (;;) // we need to loop until we get a unique one.
598  {
599  char session[32];
600  char tagstring[32];
601 
602  // linux uses a 0x prefix for pointers, Windows doesn't. Just pull off
603  // the address characters without any "0x" prefix.
604  snprintf(session, sizeof session, "%p", (void *)message.parameter1);
605  snprintf(tagstring, sizeof tagstring, "%p", (void *)tag);
606 
607  // message parameter1 is the session identifier.
608  snprintf(message.nameArg, sizeof message.nameArg, "S%sQ%s", (char *)(session[1] == 'x' ? session + 2 : session),
609  (char *)(tagstring[1] == 'x' ? tagstring + 2 : tagstring));
610  if (namedQueues.locate(message.nameArg) == 0)
611  {
612  // set the name
613  queue->setName(message.nameArg);
614  // got a good one, add it and return
615  namedQueues.add(queue);
616  return;
617  }
618  tag++; // try a new number
619  }
620 }
621 
622 
623 // Create a named queue. The message arguments have the
624 // following meanings:
625 //
626 // parameter1 -- caller's session id, only used with duplicates
627 // nameArg -- ASCII-Z name of the queue
629 {
630  // no user-specified name?
631  if (strlen(message.nameArg) == 0)
632  {
633  // create a uniquely named one
634  createUniqueQueue(message);
635  message.setResult(QUEUE_CREATED);
636  return;
637  }
638 
639  DataQueue *queue = namedQueues.locate(message.nameArg);
640  // not previously created?
641  if (queue == NULL)
642  {
643  // this is easy, just create a new queue and add it to the table
644  queue = new DataQueue(message.nameArg, "ServerQueueManager::createNamedQueue", 0);
645  namedQueues.add(queue);
646  message.setResult(QUEUE_CREATED);
647  return;
648  }
649  // name collision...we need to update
650  else
651  {
653  // create a uniquely named one
654  createUniqueQueue(message);
655  }
656 }
657 
658 
659 // Create a named queue. The message arguments have the
660 // following meanings:
661 //
662 // parameter1 -- caller's session id, only used with duplicates
663 // nameArg -- ASCII-Z name of the queue
665 {
666  DataQueue *queue = namedQueues.locate(message.nameArg);
667  // not previously created?
668  if (queue == NULL)
669  {
670  // this is easy, just create a new queue and add it to the table
671  queue = new DataQueue(message.nameArg, "ServerQueueManager::openNamedQueue", 0);
672  namedQueues.add(queue);
673  message.setResult(QUEUE_CREATED);
674  }
675  else
676  {
677  // indicate this already exists
678  message.setResult(QUEUE_EXISTS);
679  }
680 }
681 
682 
683 // Query a named queue. The message arguments have the
684 // following meanings:
685 //
686 // parameter1 -- caller's session id, only used with duplicates
687 // nameArg -- ASCII-Z name of the queue
689 {
690  DataQueue *queue = namedQueues.locate(message.nameArg);
691  // not previously created?
692  if (queue == NULL)
693  {
694  // not here
696  }
697  else
698  {
699  // indicate this already exists
700  message.setResult(QUEUE_EXISTS);
701  }
702 }
703 
704 
705 // Increment the reference count on a session queue. The message arguments have the
706 // following meanings:
707 //
708 // parameter1 -- session queue handle
710 {
711  SessionID session = (SessionID)message.parameter2;
712  // this will create one associated with the session or force it to be
713  // nested if it doesn't exist
714  createSessionQueue(session);
715  message.parameter1 = (uintptr_t)session;
716  message.setResult(QUEUE_OK);
717 }
718 
719 
720 // Delete a session queue. The message arguments have the
721 // following meanings:
722 //
723 // parameter1 -- session queue handle
725 {
726  SessionID session = (SessionID)message.parameter1;
727  DataQueue *queue = getSessionQueue(session);
728  message.setResult(QUEUE_DELETED);
729  // do we have clients waiting for pull data?
730  if (queue->hasWaiters())
731  {
732  message.setResult(QUEUE_IN_USE);
733  }
734  // still have references?
735  else if (queue->removeReference() == 0)
736  {
737  sessionQueues.remove(queue); // remove from table and
738  delete queue; // delete this
739  }
740 }
741 
742 
743 // Delete a session queue. The message arguments have the
744 // following meanings:
745 //
746 // parameter1 -- session queue handle
748 {
749  // do we have clients waiting for pull data?
750  if (!queue->hasWaiters())
751  {
752  // still have references?
753  if (queue->removeReference() == 0)
754  {
755  sessionQueues.remove(queue); // remove from table and
756  delete queue; // delete this
757  }
758  }
759 }
760 
761 // Delete a named queue. The message arguments have the
762 // following meanings:
763 //
764 // nameArg -- ASCII-Z name of the queue
766 {
767  DataQueue *queue = namedQueues.locate(message.nameArg);
768  message.setResult(QUEUE_DELETED);
769  // not previously created?
770  if (queue == NULL)
771  {
773  }
774  else
775  {
776  // do we have clients waiting for pull data?
777  if (queue->hasWaiters())
778  {
779  message.setResult(QUEUE_IN_USE);
780  }
781  else
782  {
783  namedQueues.remove(queue); // remove the queue item
784  delete queue; // delete this
785  }
786  }
787 }
788 
789 
790 // get the count for a session queue. The message arguments have the
791 // following meanings:
792 //
793 // parameter1 -- handle of the session queue (updated to queue count on return)
795 {
796  SessionID session = (SessionID)message.parameter1;
797  DataQueue *queue = getSessionQueue(session);
798  // session queues are automatically created, so we always have
799  // an item count
800  message.parameter1 = queue->getItemCount();
801  message.setResult(QUEUE_EXISTS);
802 }
803 
804 
805 // Pull an item from a session queue. The message arguments have the
806 // following meanings:
807 //
808 // nameArg -- ASCII-Z name of the queue
810 {
811  DataQueue *queue = namedQueues.locate(message.nameArg);
812  // not previously created?
813  if (queue == NULL)
814  {
815  // this is an error
817  }
818  // queue exists, so add the item
819  else
820  {
821  message.parameter1 = queue->getItemCount();
822  message.setResult(QUEUE_EXISTS);
823  }
824 }
825 
826 
827 // clear any entries from a session queue. The message arguments have the
828 // following meanings:
829 //
830 // parameter1 -- handle of the session queue (updated to queue count on return)
832 {
833  SessionID session = (SessionID)message.parameter1;
834  DataQueue *queue = getSessionQueue(session);
835  // session queues are automatically created, so we always have
836  // an item count
837  queue->clear();
838  message.setResult(QUEUE_EXISTS);
839 }
840 
841 
842 // clear any entreis from a named queue. The message arguments have the
843 // following meanings:
844 //
845 // nameArg -- ASCII-Z name of the queue
847 {
848  DataQueue *queue = namedQueues.locate(message.nameArg);
849  // not previously created?
850  if (queue == NULL)
851  {
852  // this is an error
854  }
855  // queue exists, so add the item
856  else
857  {
858  queue->clear();
859  message.setResult(QUEUE_EXISTS);
860  }
861 }
862 
863 
864 /**
865  * Dispatch a queue manager message to the appropriate action.
866  *
867  * @param message The inbound message from the client.
868  */
870 {
871  // the pull operations might have to wait for an item to be added,
872  // so they need to control their own locking mechanisms
873  if (message.operation == PULL_FROM_NAMED_QUEUE)
874  {
875  pullFromNamedQueue(message);
876  }
877  else if (message.operation == PULL_FROM_SESSION_QUEUE)
878  {
879  pullFromSessionQueue(message);
880  }
881  else {
882  Lock managerLock(lock, "ServerQueueManager::dispatch", 0); // we need to synchronize on this instance
883  switch (message.operation)
884  {
885  case NEST_SESSION_QUEUE:
886  nestSessionQueue(message);
887  break;
889  createSessionQueue(message);
890  break;
891  case CREATE_NAMED_QUEUE:
892  createNamedQueue(message);
893  break;
894  case OPEN_NAMED_QUEUE:
895  openNamedQueue(message);
896  break;
897  case QUERY_NAMED_QUEUE:
898  queryNamedQueue(message);
899  break;
901  deleteSessionQueue(message);
902  break;
903  case DELETE_NAMED_QUEUE:
904  deleteNamedQueue(message);
905  break;
907  getSessionQueueCount(message);
908  break;
910  getNamedQueueCount(message);
911  break;
912  case CLEAR_SESSION_QUEUE:
913  clearSessionQueue(message);
914  break;
915  case CLEAR_NAMED_QUEUE:
916  clearNamedQueue(message);
917  break;
918  case ADD_TO_NAMED_QUEUE:
919  addToNamedQueue(message);
920  break;
922  addToSessionQueue(message);
923  break;
924  default:
925  message.setExceptionInfo(SERVER_FAILURE, "Invalid queue manager operation");
926  break;
927  }
928  }
929 }
930 
932 {
933  DataQueue *queue = sessionQueues.locate(session);
934  // if the queue exists for this session, turn this into a
935  // delete request.
936  if (queue != NULL)
937  {
938  deleteSessionQueue((DataQueue *)queue);
939  }
940 }
int rxapiCounter
Definition: APIServer.cpp:48
@ SERVER_FAILURE
@ QUEUE_IN_USE
@ QUEUE_CREATED
@ DUPLICATE_QUEUE_NAME
@ QUEUE_DELETED
@ QUEUE_EXISTS
@ QUEUE_DOES_NOT_EXIST
@ QUEUE_EMPTY
@ QUEUE_ITEM_PULLED
@ QUEUE_OK
@ QUEUE_ITEM_ADDED
uintptr_t SessionID
@ GET_SESSION_QUEUE_COUNT
@ NEST_SESSION_QUEUE
@ CREATE_NAMED_QUEUE
@ CLEAR_SESSION_QUEUE
@ DELETE_NAMED_QUEUE
@ ADD_TO_NAMED_QUEUE
@ PULL_FROM_SESSION_QUEUE
@ ADD_TO_SESSION_QUEUE
@ QUERY_NAMED_QUEUE
@ OPEN_NAMED_QUEUE
@ PULL_FROM_NAMED_QUEUE
@ CLEAR_NAMED_QUEUE
@ DELETE_SESSION_QUEUE
@ GET_NAMED_QUEUE_COUNT
@ CREATE_SESSION_QUEUE
@ QUEUE_WAIT_FOR_DATA
@ QUEUE_LIFO
void waitForData()
DataQueue * next
void addReference()
SessionID session
QueueItem * lastItem
void addWaiter()
void add(ServiceMessage &message)
QueueItem * firstItem
void addFifo(QueueItem *item)
void pull(ServerQueueManager *manager, ServiceMessage &message)
void clear()
SysSemaphore waitSem
void removeWaiter()
void setName(const char *name)
size_t itemCount
size_t getItemCount()
void checkWaiters()
size_t removeReference()
bool pullData(ServerQueueManager *manager, ServiceMessage &message)
bool hasWaiters()
const char * queueName
QueueItem * getFirst()
void addLifo(QueueItem *item)
RexxQueueTime addTime
const char * elementData
size_t size
QueueItem * next
void clear()
void setTime()
DataQueue * locate(const char *name)
DataQueue * synchronizedLocate(ServerQueueManager *manager, const char *name)
DataQueue * remove(const char *name)
void add(DataQueue *queue)
DataQueue * queues
void removeQueue(DataQueue *current, DataQueue *previous)
void createNamedQueue(ServiceMessage &message)
void clearSessionQueue(ServiceMessage &message)
void openNamedQueue(ServiceMessage &message)
void addToNamedQueue(ServiceMessage &message)
void pullFromNamedQueue(ServiceMessage &message)
void nestSessionQueue(ServiceMessage &message)
void dispatch(ServiceMessage &message)
void cleanupProcessResources(SessionID session)
void getSessionQueueCount(ServiceMessage &message)
void deleteNamedQueue(ServiceMessage &message)
void createUniqueQueue(ServiceMessage &message)
void addToSessionQueue(ServiceMessage &message)
void deleteSessionQueue(ServiceMessage &message)
DataQueue * getSessionQueue(SessionID session)
void queryNamedQueue(ServiceMessage &message)
QueueTable sessionQueues
void getNamedQueueCount(ServiceMessage &message)
void createSessionQueue(ServiceMessage &message)
void pullFromSessionQueue(ServiceMessage &message)
friend class DataQueue
QueueTable namedQueues
void clearNamedQueue(ServiceMessage &message)
void setExceptionInfo(ErrorCode error, const char *message)
uintptr_t parameter3
ServerOperation operation
void setMessageData(void *data, size_t length)
char nameArg[NAMESIZE]
uintptr_t parameter1
void setResult(ServiceReturn code)
void * getMessageData()
uintptr_t parameter2
static int strCaselessCompare(const char *opt1, const char *opt2)
Definition: Utilities.cpp:102
if(!yymsg) yymsg
uint32_t microseconds
Definition: rexx.h:248
uint16_t year
Definition: rexx.h:246
uint16_t seconds
Definition: rexx.h:242
uint16_t day
Definition: rexx.h:244
uint16_t month
Definition: rexx.h:245
uint16_t hours
Definition: rexx.h:240
uint16_t minutes
Definition: rexx.h:241
uint16_t weekday
Definition: rexx.h:247
UINT_PTR uintptr_t