APIServer.cpp
Go to the documentation of this file.
1 /*----------------------------------------------------------------------------*/
2 /* */
3 /* Copyright (c) 1995, 2004 IBM Corporation. All rights reserved. */
4 /* Copyright (c) 2005-2009 Rexx Language Association. All rights reserved. */
5 /* */
6 /* This program and the accompanying materials are made available under */
7 /* the terms of the Common Public License v1.0 which accompanies this */
8 /* distribution. A copy is also available at the following address: */
9 /* http://www.ibm.com/developerworks/oss/CPLv1.0.htm */
10 /* */
11 /* Redistribution and use in source and binary forms, with or */
12 /* without modification, are permitted provided that the following */
13 /* conditions are met: */
14 /* */
15 /* Redistributions of source code must retain the above copyright */
16 /* notice, this list of conditions and the following disclaimer. */
17 /* Redistributions in binary form must reproduce the above copyright */
18 /* notice, this list of conditions and the following disclaimer in */
19 /* the documentation and/or other materials provided with the distribution. */
20 /* */
21 /* Neither the name of Rexx Language Association nor the names */
22 /* of its contributors may be used to endorse or promote products */
23 /* derived from this software without specific prior written permission. */
24 /* */
25 /* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS */
26 /* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT */
27 /* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS */
28 /* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT */
29 /* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */
30 /* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED */
31 /* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, */
32 /* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY */
33 /* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */
34 /* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS */
35 /* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
36 /* */
37 /*----------------------------------------------------------------------------*/
38 
39 
40 #include "APIServer.hpp"
41 #include "APIServerInstance.hpp"
42 #include "APIServerThread.hpp"
43 #include <new>
44 #include "ServiceMessage.hpp"
45 #include "ServiceException.hpp"
46 #include <stdio.h>
47 
48 int rxapiCounter = 0;
49 
50 /**
51  * Initialize the server side of the operation.
52  */
54 {
55  // able to initialize our communications pipe?
57  {
58  throw new ServiceException(SERVER_FAILURE, "RexxAPIServer::initServer() Failure creating server stream");
59  }
60 
61  lock.create(); // create the mutex.
62  serverActive = true;
63 }
64 
65 
66 /**
67  * Do server shutdown processing and resource cleanup.
68  */
70 {
71  // flip the sign over to the closed side.
72  server.close();
73  serverActive = false;
74 }
75 
76 
77 /**
78  * Process the Rexx API requests as a queue of messages. Each
79  * message is handled through to completion, so the message
80  * queue is the synchronization point.
81  */
83 {
84  ServiceMessage message;
85 
86  while (serverActive)
87  {
88  // get a new connection.
89  SysServerConnection *connection = server.connect();
90  // we might have some terminated threads waiting
91  // for final resource cleanup...this is a good place to
92  // check for this.
94  // we have some sort of resource problem...force termination and shutdown.
95  if (connection == NULL)
96  {
97  break;
98  }
99  // create a new thread to service this client connection
100  APIServerThread *thread = new APIServerThread(this, connection);
101  thread->start();
102  }
103 }
104 
105 /**
106  * Handle a session termination event.
107  *
108  * @param thread
109  */
111 {
112  // we need to hold the lock while handling this
113  ServerLock sl(this);
114  // add to the queue for cleanup on the next opportunity
115  terminatedThreads.push_back(thread);
116 }
117 
118 
119 
120 /**
121  * Cleanup the resources devoted to threads that have
122  * terminated.
123  */
125 {
126  // we need to hold the lock while handling this
127  ServerLock sl(this);
128 
129  // clean up the connection pools
130  while (!terminatedThreads.empty())
131  {
132  APIServerThread *thread = terminatedThreads.front();
133  terminatedThreads.pop_front();
134  // shut down the resources for this thread and release the memory
135  thread->terminate();
136  delete thread;
137  }
138 }
139 
140 #define ENUM_TEXT(code) case code : return #code;
141 
143 {
144  switch(code)
145  {
150  default: return "<Unknown>";
151  }
152 }
153 
155 {
156  switch(code)
157  {
158  // macro space operations
172 
173  // queue manager operations
189 
190  // registration manager operations
199 
200  // global API operations
205 
206  default: return "<Unknown>";
207  }
208 }
209 
210 /**
211  * Process the Rexx API requests as a queue of messages. Each
212  * message is handled through to completion, so the message
213  * queue is the synchronization point.
214  */
216 {
217  while (serverActive)
218  {
219  ServiceMessage message;
220  try
221  {
222  // read the message.
223  rxapiCounter++;
224  message.readMessage(connection);
225  } catch (ServiceException *e)
226  {
227  // an error here is likely caused by the client closing the connection.
228  // delete both the exception and the connection and terminate the thread.
229 #ifdef _DEBUG
230  // To avoid mixing output from different threads , better to have one dbgprintf instead of two.
231  if (Utilities::traceConcurrency()) dbgprintf(CONCURRENCY_TRACE "...... ... (rxapi) %05i ServiceException caught\n", Utilities::currentThreadId(), NULL, NULL, 0, ' ', rxapiCounter);
232  else dbgprintf("(rxapi) %05i ServiceException caught\n", rxapiCounter);
233 #endif
234  delete e;
235  delete connection;
236  return;
237  }
238 
239 #ifdef _DEBUG
241  {
242  // To avoid mixing output from different threads , better to have each line displayed by one dbgprintf instead of two.
243  dbgprintf(CONCURRENCY_TRACE "...... ... (rxapi) %05i ServiceMessage %s %s\n", Utilities::currentThreadId(), NULL, NULL, 0, ' ', rxapiCounter, ServerManagerText(message.messageTarget), ServerOperationText(message.operation));
244  dbgprintf(CONCURRENCY_TRACE "...... ... (rxapi) %05i session=%i\n", Utilities::currentThreadId(), NULL, NULL, 0, ' ', rxapiCounter, message.session);
245  dbgprintf(CONCURRENCY_TRACE "...... ... (rxapi) %05i nameArg=%s\n", Utilities::currentThreadId(), NULL, NULL, 0, ' ', rxapiCounter, message.nameArg);
246  dbgprintf(CONCURRENCY_TRACE "...... ... (rxapi) %05i userid=%s\n", Utilities::currentThreadId(), NULL, NULL, 0, ' ', rxapiCounter, message.userid);
247  }
248  else
249  {
250  // Here, we don't display concurrency trace, we have one dbgprintf per line, good.
251  dbgprintf("(rxapi) %05i ServiceMessage %s %s\n", rxapiCounter, ServerManagerText(message.messageTarget), ServerOperationText(message.operation));
252  dbgprintf("(rxapi) %05i session=%i\n", rxapiCounter, message.session);
253  dbgprintf("(rxapi) %05i nameArg=%s\n", rxapiCounter, message.nameArg);
254  dbgprintf("(rxapi) %05i userid=%s\n", rxapiCounter, message.userid);
255  }
256 #endif
257  message.result = MESSAGE_OK; // unconditionally zero the result
258  try
259  {
260  // each target handles its own dispatch.
261  switch (message.messageTarget)
262  {
263  case QueueManager:
264  case RegistrationManager:
265  case MacroSpaceManager:
266  {
267  getInstance(message)->dispatch(message);
268  break;
269  }
270 
271  // general API control message.
272  case APIManager:
273  {
274  // this could be a shutdown operation
275  if (message.operation == CLOSE_CONNECTION)
276  {
277  connection->disconnect();
278  delete connection;
279  return;
280  }
281 
282  dispatch(message);
283  break;
284  }
285  }
286  } catch (std::bad_alloc &)
287  {
288  // this catches any C++ memory allocation errors, which we'll just return into a
289  // memory failure result message.
290  message.result = SERVER_ERROR;
291 
292  }
293 
294  try
295  {
296  // ping the message back to the caller
297  message.writeResult(connection);
298  } catch (ServiceMessage *e)
299  {
300  // an error here is likely caused by the client closing the connection.
301  // delete both the exception and the connection and terminate the thread.
302  delete e;
303  delete connection;
304  return;
305  }
306  }
307 }
308 
309 
310 /**
311  * Dispatch an API server control message.
312  *
313  * @param message The control message parameter.
314  */
316 {
317  message.result = MESSAGE_OK;
318  switch (message.operation)
319  {
320  case SHUTDOWN_SERVER:
321  {
322  shutdown();
323  break;
324  }
325 
326  // TODO: Make sure process cleanup is driven
327  case PROCESS_CLEANUP:
328  {
329  cleanupProcessResources(message);
330  break;
331  }
332 
333  // This is an "are you there" ping. Pass back the version information as a parameter.
334  case CONNECTION_ACTIVE:
335  {
336  message.parameter1 = REXXAPI_VERSION;
337  break;
338  }
339 
340  default:
341  message.setExceptionInfo(SERVER_FAILURE, "Invalid API manager operation");
342  break;
343  }
344 }
345 
346 /**
347  * Cleanup sessions specific resources after a Rexx process
348  * terminates.
349  *
350  * @param message The service message with the session information.
351  */
353 {
354  getInstance(message)->cleanupProcessResources(message);
355 }
356 
357 
358 /**
359  * Cause the API server to shutdown.
360  *
361  * @return SERVER_STOPPED if a stoppage is possible, SERVER_NOT_STOPPED
362  * if it was not in a stoppable state.
363  */
365 {
366  // any processing running Rexx active?
367  if (isStoppable())
368  {
369  serverActive = false;
370  return SERVER_STOPPED;
371  }
372  return SERVER_NOT_STOPPABLE;
373 }
374 
375 /**
376  * Stop the server.
377  */
379 {
380  // set the stop flag and wake up the message loop
381  serverActive = false;
382 }
383 
384 /**
385  * Test to see if the api server is in a state where it can be
386  * stopped. A stoppable state implies there are no session
387  * specific resources currently active in the server.
388  *
389  * @return True if the server is stoppable, false otherwise.
390  */
392 {
393  APIServerInstance *current = instances;
394  while (current != NULL)
395  {
396  if (!current->isStoppable())
397  {
398  return false;
399  }
400  current = current->next;
401  }
402  return true;
403 }
404 
405 
406 /**
407  * Get the server instance associate with a particular userid,
408  * creating a new instance if this is the first time we've processed
409  * a request for this id.
410  *
411  * @param m The service message associated with the request.
412  *
413  * @return A pointer to the correct instance.
414  */
416 {
417  // synchronize access on this
418  ServerLock sl(this);
419 
420  APIServerInstance *current = instances;
421  while (current != NULL)
422  {
423  if (current->isUser(m))
424  {
425  return current;
426  }
427  current = current->next;
428  }
429 
430  current = new APIServerInstance(m);
431  current->next = instances;
432  instances = current;
433  return current;
434 }
435 
436 
437 /**
438  * Request the global server API lock.
439  */
441 {
442  lock.request("APIServer::requestLock", 0);
443 }
444 
445 
446 /**
447  * Release the global server API lock.
448  */
450 {
451  lock.release("APIServer::releaseLock", 0);
452 }
const char * ServerManagerText(ServerManager code)
Definition: APIServer.cpp:142
#define ENUM_TEXT(code)
Definition: APIServer.cpp:140
int rxapiCounter
Definition: APIServer.cpp:48
const char * ServerOperationText(ServerOperation code)
Definition: APIServer.cpp:154
@ SERVER_FAILURE
ServiceReturn
@ SERVER_ERROR
@ SERVER_STOPPED
@ MESSAGE_OK
@ SERVER_NOT_STOPPABLE
ServerOperation
@ CLEAR_MACRO_SPACE
@ GET_SESSION_QUEUE_COUNT
@ NEST_SESSION_QUEUE
@ CREATE_NAMED_QUEUE
@ QUERY_MACRO
@ CONNECTION_ACTIVE
@ REGISTER_LOAD_LIBRARY
@ NEXT_MACRO_DESCRIPTOR
@ REGISTER_QUERY_LIBRARY
@ MACRO_SEND_NEXT
@ CLEAR_SESSION_QUEUE
@ REGISTER_DROP_LIBRARY
@ REGISTER_ENTRYPOINT
@ GET_MACRO_DESCRIPTOR
@ DELETE_NAMED_QUEUE
@ GET_MACRO_IMAGE
@ ADD_TO_NAMED_QUEUE
@ PROCESS_CLEANUP
@ REGISTER_DROP
@ REGISTER_LIBRARY
@ REMOVE_MACRO
@ SHUTDOWN_SERVER
@ PULL_FROM_SESSION_QUEUE
@ ADD_TO_SESSION_QUEUE
@ QUERY_NAMED_QUEUE
@ OPEN_NAMED_QUEUE
@ REGISTER_QUERY
@ REORDER_MACRO
@ UPDATE_CALLBACK
@ NEXT_MACRO_IMAGE
@ ADD_MACRO
@ PULL_FROM_NAMED_QUEUE
@ CLEAR_NAMED_QUEUE
@ CLOSE_CONNECTION
@ DELETE_SESSION_QUEUE
@ GET_NAMED_QUEUE_COUNT
@ ITERATE_MACRO_DESCRIPTORS
@ ITERATE_MACROS
@ CREATE_SESSION_QUEUE
@ MACRO_RETRIEVE_NEXT
ServerManager
@ QueueManager
@ RegistrationManager
@ APIManager
@ MacroSpaceManager
#define REXX_API_PORT
@ REXXAPI_VERSION
#define CONCURRENCY_TRACE
Definition: Utilities.hpp:50
std::list< APIServerThread * > terminatedThreads
Definition: APIServer.hpp:78
void requestLock()
Definition: APIServer.cpp:440
SysMutex lock
Definition: APIServer.hpp:74
void initServer()
Definition: APIServer.cpp:53
ServiceReturn shutdown()
Definition: APIServer.cpp:364
void terminateServer()
Definition: APIServer.cpp:69
void releaseLock()
Definition: APIServer.cpp:449
APIServerInstance * getInstance(ServiceMessage &m)
Definition: APIServer.cpp:415
void processMessages(SysServerConnection *connection)
Definition: APIServer.cpp:215
void listenForConnections()
Definition: APIServer.cpp:82
virtual void stop()
Definition: APIServer.cpp:378
void sessionTerminated(APIServerThread *thread)
Definition: APIServer.cpp:110
SysServerStream server
Definition: APIServer.hpp:75
void cleanupProcessResources(ServiceMessage &message)
Definition: APIServer.cpp:352
virtual bool isStoppable()
Definition: APIServer.cpp:391
bool serverActive
Definition: APIServer.hpp:76
APIServerInstance * instances
Definition: APIServer.hpp:77
void dispatch(ServiceMessage &message)
Definition: APIServer.cpp:315
void cleanupTerminatedSessions()
Definition: APIServer.cpp:124
APIServerInstance * next
bool isUser(ServiceMessage &message)
void dispatch(ServiceMessage &message)
virtual bool isStoppable()
void cleanupProcessResources(ServiceMessage &message)
void setExceptionInfo(ErrorCode error, const char *message)
ServerManager messageTarget
void readMessage(SysServerConnection *server)
ServerOperation operation
char userid[MAX_USERID_LENGTH]
char nameArg[NAMESIZE]
void writeResult(SysServerConnection *server)
uintptr_t parameter1
ServiceReturn result
void release(const char *ds, int di)
void request(const char *ds, int di)
bool make(const char *)
SysServerConnection * connect()
void terminate()
static wholenumber_t currentThreadId()
static bool traceConcurrency()
void dbgprintf(const char *format,...)