LocalQueueManager.cpp
Go to the documentation of this file.
1 /*----------------------------------------------------------------------------*/
2 /* */
3 /* Copyright (c) 1995, 2004 IBM Corporation. All rights reserved. */
4 /* Copyright (c) 2005-2009 Rexx Language Association. All rights reserved. */
5 /* */
6 /* This program and the accompanying materials are made available under */
7 /* the terms of the Common Public License v1.0 which accompanies this */
8 /* distribution. A copy is also available at the following address: */
9 /* http://www.ibm.com/developerworks/oss/CPLv1.0.htm */
10 /* */
11 /* Redistribution and use in source and binary forms, with or */
12 /* without modification, are permitted provided that the following */
13 /* conditions are met: */
14 /* */
15 /* Redistributions of source code must retain the above copyright */
16 /* notice, this list of conditions and the following disclaimer. */
17 /* Redistributions in binary form must reproduce the above copyright */
18 /* notice, this list of conditions and the following disclaimer in */
19 /* the documentation and/or other materials provided with the distribution. */
20 /* */
21 /* Neither the name of Rexx Language Association nor the names */
22 /* of its contributors may be used to endorse or promote products */
23 /* derived from this software without specific prior written permission. */
24 /* */
25 /* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS */
26 /* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT */
27 /* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS */
28 /* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT */
29 /* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */
30 /* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED */
31 /* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, */
32 /* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY */
33 /* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */
34 /* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS */
35 /* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
36 /* */
37 /*----------------------------------------------------------------------------*/
38 
39 #include "LocalQueueManager.hpp"
40 #include "Encodings.hpp"
41 #include "LocalAPIManager.hpp"
42 #include "SysLocalAPIManager.hpp"
43 #include "rexx.h"
44 #include "ClientMessage.hpp"
45 #include "Utilities.hpp"
46 #include <stdio.h>
47 #include <ctype.h>
48 
49 // make sure we remember what we do for this process.
51 
52 /**
53  * Initialize the local queue manager.
54  */
56 {
57  localManager = NULL;
58  sessionQueue = 0;
59  sessionID = 0;
60 }
61 
62 /**
63  * Validate a queue name
64  *
65  * @param username The name to validate.
66  */
67 bool LocalQueueManager::validateQueueName(const char *username)
68 {
69  if (username == NULL) /* NULL is OK. */
70  {
71  return true;
72  }
73  // "SESSION" is a reserved name, reject this in this context
74  if (Utilities::strCaselessCompare(username, "SESSION") == 0)
75  {
76  return false;
77  }
78 
79  size_t namelen = strlen(username);
80  if (namelen > 0 && namelen < MAX_QUEUE_NAME_LENGTH)
81  {
82  const char *valptr = username; /* point to name */
83  char ch;
84  while ((ch = *(valptr++)))
85  { /* While have not reached end */
86  ch = toupper(ch); /* convert to upper case */
87  if (!isalpha(ch) && !isdigit(ch) && ch != ch_PERIOD &&
88  ch != ch_QUESTION_MARK && ch != ch_EXCLAMATION && ch != ch_UNDERSCORE)
89  {
90  return false;
91  }
92  }
93  return true;
94  }
95  else
96  {
97  return false;
98  }
99 }
100 
101 
102 /**
103  * Initialize the local (client) queue manager instance.
104  *
105  * @param a The local API manager instance.
106  */
108 {
109  localManager = a;
111  // find the session queue
113 }
114 
115 
116 /**
117  * Handle process termination.
118  */
120 {
121  // if we have a session queue
122  if (sessionQueue != 0)
123  {
124  try
125  {
126  deleteSessionQueue(); // try to delete this
127  }
128  catch (ServiceException *)
129  {
130  // just ignore any errors here.
131  }
132  // clear this out.
133  sessionQueue = 0;
134  }
135  // continue the shutdown
137 }
138 
139 
140 /**
141  * Create the session queue for this process.
142  *
143  * @param session
144  *
145  * @return
146  */
148 {
149  // first check to see if we have an env variable set...if we do we
150  // inherit from our parent session
151  QueueHandle mysessionQueue;
152  // we could be inheriting the session queue from a caller process...check first.
154  {
155  // make sure we update the nest count
156  // this might end up creating a new queue instance
157  mysessionQueue = nestSessionQueue(session, mysessionQueue);
158  }
159  else
160  {
161  // create a new session queue
162  mysessionQueue = createSessionQueue(session);
163  // remember that we created this initially. We'll need to create
164  // the queue each time one is needed for this session.
165  createdSessionQueue = true;
166  }
168  return mysessionQueue;
169 }
170 
171 /**
172  * Create a new session queue
173  *
174  * @param session The session identifier
175  *
176  * @return The handle of the session queue.
177  */
179 {
181  message.send();
182  // the handle is returned in the first parameter
183  return (QueueHandle)message.parameter1;
184 }
185 
186 
187 /**
188  * Create a named queue
189  *
190  * @param name The requested queue name.
191  * @param size The size of the buffer for the returned name.
192  * @param createdName
193  * The buffer for the returned name.
194  * @param dup The duplicate flag.
195  *
196  * @return true if the requested name already exists and a new name
197  * was provided.
198  */
199 RexxReturnCode LocalQueueManager::createNamedQueue(const char *name, size_t size, char *createdName, size_t *dup)
200 {
201  // validation and copying of the request name only is necessary if given.
202  if (name != NULL)
203  {
204  if (!validateQueueName(name)) // make sure this is a valid name
205  {
206  return RXQUEUE_BADQNAME;
207  }
208 
210  // the session id is used for creating unique queue names
211  message.parameter1 = localManager->getSession();
212 
213  message.send();
214  strncpy(createdName, message.nameArg, size);
215  // return the dup name indicator
216  *dup = message.result == DUPLICATE_QUEUE_NAME;
217  // everything worked here.
218  return RXQUEUE_OK;
219  }
220  else
221  {
222  // always returning a generated name
224  // the session id is used for creating unique queue names
225  message.parameter1 = localManager->getSession();
226 
227  message.send();
228  strncpy(createdName, message.nameArg, size);
229  // by definition, this is not a duplicate
230  *dup = false;
231  // everything worked here.
232  return RXQUEUE_OK;
233  }
234 }
235 
236 
237 /**
238  * Create a named queue
239  *
240  * @param name The requested queue name.
241  * @param dup The duplicate flag.
242  *
243  * @return true if the requested name already exists
244  */
245 RexxReturnCode LocalQueueManager::openNamedQueue(const char *name, size_t *dup)
246 {
247  if (!validateQueueName(name)) // make sure this is a valid name
248  {
249  return RXQUEUE_BADQNAME;
250  }
251 
253 
254  message.send();
255  // return the dup name indicator
256  *dup = message.result == QUEUE_EXISTS;
257  // everything worked here.
258  return RXQUEUE_OK;
259 }
260 
261 /**
262  * Delete the current session queue.
263  */
265 {
267  message.send();
268  // map the server result to an API return code.
269  return mapReturnResult(message);
270 }
271 
272 /**
273  * Delete a named queue.
274  *
275  * @param name The name of the queue.
276  */
278 {
279  if (!validateQueueName(name)) // make sure this is a valid name
280  {
281  return RXQUEUE_BADQNAME;
282  }
283 
285  message.send();
286  // map the server result to an API return code.
287  return mapReturnResult(message);
288 }
289 
290 /**
291  * Delete a named queue.
292  *
293  * @param name The name of the queue.
294  */
296 {
297  if (!validateQueueName(name)) // make sure this is a valid name
298  {
299  return RXQUEUE_BADQNAME;
300  }
301 
303  message.send();
304  // map the server result to an API return code.
305  return mapReturnResult(message);
306 }
307 
308 /**
309  * Get the count of lines in the session queue.
310  *
311  * @return The queue line count.
312  */
314 {
316 
317  message.send();
318  // the handle is returned in the first parameter
319  result = (size_t)message.parameter1;
320  // map the server result to an API return code.
321  return mapReturnResult(message);
322 }
323 
324 /**
325  * Get a queue count from a named queue.
326  *
327  * @param name The queue name.
328  *
329  * @return The count of items in the queue.
330  */
331 RexxReturnCode LocalQueueManager::getQueueCount(const char *name, size_t &result)
332 {
333  if (!validateQueueName(name)) // make sure this is a valid name
334  {
335  return RXQUEUE_BADQNAME;
336  }
337 
339  message.send();
340  // the handle is returned in the first parameter
341  result = (size_t)message.parameter1;
342  // map the server result to an API return code.
343  return mapReturnResult(message);
344 }
345 
346 /**
347  * Remove all items from the session queue.
348  */
350 {
352 
353  message.send();
354  // map the server result to an API return code.
355  return mapReturnResult(message);
356 }
357 
358 /**
359  * Remove all items from a named queue.
360  *
361  * @param name The queue name.
362  */
364 {
365  if (!validateQueueName(name)) // make sure this is a valid name
366  {
367  return RXQUEUE_BADQNAME;
368  }
369 
371 
372  message.send();
373  // map the server result to an API return code.
374  return mapReturnResult(message);
375 }
376 
377 
378 /**
379  * Add an item to a named queue.
380  *
381  * @param name The name of the queue.
382  * @param data The data to add
383  * @param lifoFifo The lifo/fifo order flag.
384  */
385 RexxReturnCode LocalQueueManager::addToNamedQueue(const char *name, CONSTRXSTRING &data, size_t lifoFifo)
386 {
388  // set the additional arguments
389  message.parameter1 = data.strlength;
390  message.parameter2 = lifoFifo; // make sure we have the add order
391 
392  // attach the queue item to the message.
393  message.setMessageData((void *)data.strptr, data.strlength);
394  message.send();
395  // map the server result to an API return code.
396  return mapReturnResult(message);
397 }
398 
399 
400 /**
401  * Add an item to the session queue.
402  *
403  * @param data The data to add.
404  * @param lifoFifo The lifo/fifo flag.
405  */
407 {
409 
410  // set the additional arguments
411  message.parameter1 = data.strlength;
412  message.parameter2 = lifoFifo; // make sure we have the add order
413  message.parameter3 = sessionQueue; // set the session handle next
414 
415  // attach the queue item to the message.
416  message.setMessageData((void *)data.strptr, data.strlength);
417  message.send();
418  // map the server result to an API return code.
419  return mapReturnResult(message);
420 }
421 
422 
423 RexxReturnCode LocalQueueManager::pullFromQueue(const char *name, RXSTRING &data, size_t waitFlag, RexxQueueTime *timeStamp)
424 {
426  // set up for either name or session queue read
427  if (name != NULL)
428  {
429  strcpy(message.nameArg, name);
430  }
431  else
432  {
434  message.parameter3 = sessionQueue;
435  }
436  message.parameter1 = waitFlag != 0 ? QUEUE_WAIT_FOR_DATA : QUEUE_NO_WAIT;
437  message.send();
438  if (message.result == QUEUE_ITEM_PULLED)
439  {
440  message.transferMessageData(data);
441  // if this was a null string, then an empty buffer is sent back. Allocate a minimal
442  // buffer to distinguish between nothing and a null string value
443  if (data.strptr == NULL)
444  {
445  data.strptr = (char *)RexxAllocateMemory(1);
446  if (data.strptr == NULL)
447  {
448  throw new ServiceException(MEMORY_ERROR, "LocalQueueManager::pullFromQueue() Failure allocating memory");
449  }
450  }
451  // if the timestamp was requested, return it.
452  if (timeStamp != NULL)
453  {
454  memcpy(timeStamp, message.nameArg, sizeof(RexxQueueTime));
455  }
456  }
457  // map the server result to an API return code.
458  return mapReturnResult(message);
459 }
460 
461 
462 /**
463  * Bump the usage count of a session queue when it is
464  * inherited from a parent process.
465  *
466  * @param q The handle of the session queue.
467  */
469 {
470  ClientMessage message(QueueManager, NEST_SESSION_QUEUE, session);
471  message.parameter2 = q;
472  message.send();
473  // we either got back the queue handle for the nested one, or a new
474  // queue.
475  return (QueueHandle)message.parameter1;
476 }
477 
478 
479 /**
480  * Process an exception returned from the server and
481  * map it into an API return code.
482  *
483  * @param e The exception from the server.
484  *
485  * @return The mapped return code.
486  */
488 {
489  switch (e->getErrorCode())
490  {
491  case INVALID_QUEUE_NAME:
492  return RXQUEUE_BADQNAME;
493 
494  case BAD_FIFO_LIFO:
495  return RXQUEUE_PRIORITY;
496 
497  case BAD_WAIT_FLAG:
498  return RXQUEUE_BADWAITFLAG;
499 
500  default:
501  return RXQUEUE_MEMFAIL;
502  }
503 }
504 
505 
506 /**
507  * Process an exception returned from the server and
508  * map it into an API return code.
509  *
510  * @param e The exception from the server.
511  *
512  * @return The mapped return code.
513  */
515 {
516  switch (m.result)
517  {
518  case INVALID_QUEUE_NAME:
519  return RXQUEUE_BADQNAME;
520 
521  case BAD_FIFO_LIFO:
522  return RXQUEUE_PRIORITY;
523 
524  case BAD_WAIT_FLAG:
525  return RXQUEUE_BADWAITFLAG;
526 
528  return RXQUEUE_NOTREG;
529 
530  case QUEUE_IN_USE:
531  return RXQUEUE_ACCESS;
532 
533  case QUEUE_EMPTY:
534  return RXQUEUE_EMPTY;
535 
536  default:
537  return RXQUEUE_OK;
538  }
539 }
@ ch_UNDERSCORE
Definition: Encodings.hpp:47
@ ch_PERIOD
Definition: Encodings.hpp:44
@ ch_EXCLAMATION
Definition: Encodings.hpp:46
@ ch_QUESTION_MARK
Definition: Encodings.hpp:45
@ MEMORY_ERROR
@ BAD_WAIT_FLAG
@ BAD_FIFO_LIFO
@ INVALID_QUEUE_NAME
@ QUEUE_IN_USE
@ DUPLICATE_QUEUE_NAME
@ QUEUE_EXISTS
@ QUEUE_DOES_NOT_EXIST
@ QUEUE_EMPTY
@ QUEUE_ITEM_PULLED
uintptr_t SessionID
@ GET_SESSION_QUEUE_COUNT
@ NEST_SESSION_QUEUE
@ CREATE_NAMED_QUEUE
@ CLEAR_SESSION_QUEUE
@ DELETE_NAMED_QUEUE
@ ADD_TO_NAMED_QUEUE
@ PULL_FROM_SESSION_QUEUE
@ ADD_TO_SESSION_QUEUE
@ QUERY_NAMED_QUEUE
@ OPEN_NAMED_QUEUE
@ PULL_FROM_NAMED_QUEUE
@ CLEAR_NAMED_QUEUE
@ DELETE_SESSION_QUEUE
@ GET_NAMED_QUEUE_COUNT
@ CREATE_SESSION_QUEUE
@ QueueManager
@ QUEUE_WAIT_FOR_DATA
@ QUEUE_NO_WAIT
SessionID getSession()
virtual void terminateProcess()
virtual void initializeLocal(LocalAPIManager *a)
virtual RexxReturnCode processServiceException(ServiceException *e)
RexxReturnCode clearSessionQueue()
QueueHandle createSessionQueue(SessionID session)
QueueHandle initializeSessionQueue(SessionID s)
RexxReturnCode mapReturnResult(ServiceMessage &m)
RexxReturnCode deleteNamedQueue(const char *name)
RexxReturnCode deleteSessionQueue()
RexxReturnCode getSessionQueueCount(size_t &)
RexxReturnCode getQueueCount(const char *name, size_t &)
RexxReturnCode openNamedQueue(const char *name, size_t *dup)
bool validateQueueName(const char *username)
RexxReturnCode addToSessionQueue(CONSTRXSTRING &data, size_t lifoFifo)
RexxReturnCode pullFromQueue(const char *name, RXSTRING &data, size_t waitFlag, RexxQueueTime *timeStamp)
RexxReturnCode queryNamedQueue(const char *name)
RexxReturnCode clearNamedQueue(const char *name)
RexxReturnCode createNamedQueue(const char *name, size_t size, char *createdName, size_t *dup)
void initializeLocal(LocalAPIManager *a)
RexxReturnCode addToNamedQueue(const char *name, CONSTRXSTRING &data, size_t lifoFifo)
static bool createdSessionQueue
QueueHandle nestSessionQueue(SessionID s, QueueHandle q)
virtual void terminateProcess()
LocalAPIManager * localManager
ErrorCode getErrorCode()
uintptr_t parameter3
ServerOperation operation
void setMessageData(void *data, size_t length)
char nameArg[NAMESIZE]
void transferMessageData(RXSTRING &data)
uintptr_t parameter1
ServiceReturn result
uintptr_t parameter2
static void setActiveSessionQueue(QueueHandle sessionQueue)
static bool getActiveSessionQueue(QueueHandle &sessionQueue)
static int strCaselessCompare(const char *opt1, const char *opt2)
Definition: Utilities.cpp:82
#define MAX_QUEUE_NAME_LENGTH
Definition: rexx.h:957
void *REXXENTRY RexxAllocateMemory(size_t)
int RexxReturnCode
Definition: rexx.h:73
#define RXQUEUE_BADWAITFLAG
Definition: rexxapidefs.h:257
#define RXQUEUE_EMPTY
Definition: rexxapidefs.h:258
#define RXQUEUE_OK
Definition: rexxapidefs.h:248
#define RXQUEUE_BADQNAME
Definition: rexxapidefs.h:255
#define RXQUEUE_ACCESS
Definition: rexxapidefs.h:260
#define RXQUEUE_PRIORITY
Definition: rexxapidefs.h:256
#define RXQUEUE_NOTREG
Definition: rexxapidefs.h:259
#define RXQUEUE_MEMFAIL
Definition: rexxapidefs.h:262
const char * strptr
Definition: rexx.h:163
size_t strlength
Definition: rexx.h:162
char * strptr
Definition: rexx.h:158