APIServer.cpp
Go to the documentation of this file.
1 /*----------------------------------------------------------------------------*/
2 /* */
3 /* Copyright (c) 1995, 2004 IBM Corporation. All rights reserved. */
4 /* Copyright (c) 2005-2009 Rexx Language Association. All rights reserved. */
5 /* */
6 /* This program and the accompanying materials are made available under */
7 /* the terms of the Common Public License v1.0 which accompanies this */
8 /* distribution. A copy is also available at the following address: */
9 /* http://www.ibm.com/developerworks/oss/CPLv1.0.htm */
10 /* */
11 /* Redistribution and use in source and binary forms, with or */
12 /* without modification, are permitted provided that the following */
13 /* conditions are met: */
14 /* */
15 /* Redistributions of source code must retain the above copyright */
16 /* notice, this list of conditions and the following disclaimer. */
17 /* Redistributions in binary form must reproduce the above copyright */
18 /* notice, this list of conditions and the following disclaimer in */
19 /* the documentation and/or other materials provided with the distribution. */
20 /* */
21 /* Neither the name of Rexx Language Association nor the names */
22 /* of its contributors may be used to endorse or promote products */
23 /* derived from this software without specific prior written permission. */
24 /* */
25 /* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS */
26 /* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT */
27 /* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS */
28 /* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT */
29 /* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */
30 /* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED */
31 /* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, */
32 /* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY */
33 /* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */
34 /* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS */
35 /* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
36 /* */
37 /*----------------------------------------------------------------------------*/
38 
39 
40 #include "APIServer.hpp"
41 #include "APIServerInstance.hpp"
42 #include "APIServerThread.hpp"
43 #include <new>
44 #include "ServiceMessage.hpp"
45 #include "ServiceException.hpp"
46 #include <stdio.h>
47 
48 int rxapiCounter = 0;
49 
50 /**
51  * Initialize the server side of the operation.
52  */
54 {
55  // able to initialize our communications pipe?
57  {
58  throw new ServiceException(SERVER_FAILURE, "RexxAPIServer::initServer() Failure creating server stream");
59  }
60 
61  lock.create(); // create the mutex.
62  serverActive = true;
63 }
64 
65 
66 /**
67  * Do server shutdown processing and resource cleanup.
68  */
70 {
71  // flip the sign over to the closed side.
72  server.close();
73  serverActive = false;
74 }
75 
76 
77 /**
78  * Process the Rexx API requests as a queue of messages. Each
79  * message is handled through to completion, so the message
80  * queue is the synchronization point.
81  */
83 {
84  ServiceMessage message;
85 
86  while (serverActive)
87  {
88  // get a new connection.
89  SysServerConnection *connection = server.connect();
90  // we might have some terminated threads waiting
91  // for final resource cleanup...this is a good place to
92  // check for this.
94  // we have some sort of resource problem...force termination and shutdown.
95  if (connection == NULL)
96  {
97  break;
98  }
99  // create a new thread to service this client connection
100  APIServerThread *thread = new APIServerThread(this, connection);
101  thread->start();
102  }
103 }
104 
105 /**
106  * Handle a session termination event.
107  *
108  * @param thread
109  */
111 {
112  // we need to hold the lock while handling this
113  ServerLock sl(this);
114  // add to the queue for cleanup on the next opportunity
115  terminatedThreads.push_back(thread);
116 }
117 
118 
119 
120 /**
121  * Cleanup the resources devoted to threads that have
122  * terminated.
123  */
125 {
126  // we need to hold the lock while handling this
127  ServerLock sl(this);
128 
129  // clean up the connection pools
130  while (!terminatedThreads.empty())
131  {
132  APIServerThread *thread = terminatedThreads.front();
133  terminatedThreads.pop_front();
134  // shut down the resources for this thread and release the memory
135  thread->terminate();
136  delete thread;
137  }
138 }
139 
140 #define ENUM_TEXT(code) case code : return #code;
141 
143 {
144  switch(code)
145  {
150  default: return "<Unknown>";
151  }
152 }
153 
155 {
156  switch(code)
157  {
158  // macro space operations
172 
173  // queue manager operations
189 
190  // registration manager operations
199 
200  // global API operations
205 
206  default: return "<Unknown>";
207  }
208 }
209 
210 /**
211  * Process the Rexx API requests as a queue of messages. Each
212  * message is handled through to completion, so the message
213  * queue is the synchronization point.
214  */
216 {
217  while (serverActive)
218  {
219  ServiceMessage message;
220  try
221  {
222  // read the message.
223  rxapiCounter++;
224  message.readMessage(connection);
225  } catch (ServiceException *e)
226  {
227  // an error here is likely caused by the client closing the connection.
228  // delete both the exception and the connection and terminate the thread.
229 #ifdef _DEBUG
230  if (Utilities::traceConcurrency()) dbgprintf(CONCURRENCY_TRACE "...... ... ", Utilities::currentThreadId(), NULL, NULL, 0, ' ');
231  dbgprintf("(rxapi) %05i ServiceException caught\n", rxapiCounter);
232 #endif
233  delete e;
234  delete connection;
235  return;
236  }
237 
238 #ifdef _DEBUG
239  if (Utilities::traceConcurrency()) dbgprintf(CONCURRENCY_TRACE "...... ... ", Utilities::currentThreadId(), NULL, NULL, 0, ' ');
240  dbgprintf("(rxapi) %05i ServiceMessage %s %s\n", rxapiCounter, ServerManagerText(message.messageTarget), ServerOperationText(message.operation));
241  if (Utilities::traceConcurrency()) dbgprintf(CONCURRENCY_TRACE "...... ... ", Utilities::currentThreadId(), NULL, NULL, 0, ' ');
242  dbgprintf("(rxapi) %05i session=%i\n", rxapiCounter, message.session);
243  if (Utilities::traceConcurrency()) dbgprintf(CONCURRENCY_TRACE "...... ... ", Utilities::currentThreadId(), NULL, NULL, 0, ' ');
244  dbgprintf("(rxapi) %05i nameArg=%s\n", rxapiCounter, message.nameArg);
245  if (Utilities::traceConcurrency()) dbgprintf(CONCURRENCY_TRACE "...... ... ", Utilities::currentThreadId(), NULL, NULL, 0, ' ');
246  dbgprintf("(rxapi) %05i userid=%s\n", rxapiCounter, message.userid);
247 #endif
248  message.result = MESSAGE_OK; // unconditionally zero the result
249  try
250  {
251  // each target handles its own dispatch.
252  switch (message.messageTarget)
253  {
254  case QueueManager:
255  case RegistrationManager:
256  case MacroSpaceManager:
257  {
258  getInstance(message)->dispatch(message);
259  break;
260  }
261 
262  // general API control message.
263  case APIManager:
264  {
265  // this could be a shutdown operation
266  if (message.operation == CLOSE_CONNECTION)
267  {
268  connection->disconnect();
269  delete connection;
270  return;
271  }
272 
273  dispatch(message);
274  break;
275  }
276  }
277  } catch (std::bad_alloc &ba)
278  {
279  ba; // get rid of compile warning
280  // this catches any C++ memory allocation errors, which we'll just return into a
281  // memory failure result message.
282  message.result = SERVER_ERROR;
283 
284  }
285 
286  try
287  {
288  // ping the message back to the caller
289  message.writeResult(connection);
290  } catch (ServiceMessage *e)
291  {
292  // an error here is likely caused by the client closing the connection.
293  // delete both the exception and the connection and terminate the thread.
294  delete e;
295  delete connection;
296  return;
297  }
298  }
299 }
300 
301 
302 /**
303  * Dispatch an API server control message.
304  *
305  * @param message The control message parameter.
306  */
308 {
309  message.result = MESSAGE_OK;
310  switch (message.operation)
311  {
312  case SHUTDOWN_SERVER:
313  {
314  shutdown();
315  break;
316  }
317 
318  // TODO: Make sure process cleanup is driven
319  case PROCESS_CLEANUP:
320  {
321  cleanupProcessResources(message);
322  break;
323  }
324 
325  // This is an "are you there" ping. Pass back the version information as a parameter.
326  case CONNECTION_ACTIVE:
327  {
328  message.parameter1 = REXXAPI_VERSION;
329  break;
330  }
331 
332  default:
333  message.setExceptionInfo(SERVER_FAILURE, "Invalid API manager operation");
334  break;
335  }
336 }
337 
338 /**
339  * Cleanup sessions specific resources after a Rexx process
340  * terminates.
341  *
342  * @param message The service message with the session information.
343  */
345 {
346  getInstance(message)->cleanupProcessResources(message);
347 }
348 
349 
350 /**
351  * Cause the API server to shutdown.
352  *
353  * @return SERVER_STOPPED if a stoppage is possible, SERVER_NOT_STOPPED
354  * if it was not in a stoppable state.
355  */
357 {
358  // any processing running Rexx active?
359  if (isStoppable())
360  {
361  serverActive = false;
362  return SERVER_STOPPED;
363  }
364  return SERVER_NOT_STOPPABLE;
365 }
366 
367 /**
368  * Stop the server.
369  */
371 {
372  // set the stop flag and wake up the message loop
373  serverActive = false;
374 }
375 
376 /**
377  * Test to see if the api server is in a state where it can be
378  * stopped. A stoppable state implies there are no session
379  * specific resources currently active in the server.
380  *
381  * @return True if the server is stoppable, false otherwise.
382  */
384 {
385  APIServerInstance *current = instances;
386  while (current != NULL)
387  {
388  if (!current->isStoppable())
389  {
390  return false;
391  }
392  current = current->next;
393  }
394  return true;
395 }
396 
397 
398 /**
399  * Get the server instance associate with a particular userid,
400  * creating a new instance if this is the first time we've processed
401  * a request for this id.
402  *
403  * @param m The service message associated with the request.
404  *
405  * @return A pointer to the correct instance.
406  */
408 {
409  // synchronize access on this
410  ServerLock sl(this);
411 
412  APIServerInstance *current = instances;
413  while (current != NULL)
414  {
415  if (current->isUser(m))
416  {
417  return current;
418  }
419  current = current->next;
420  }
421 
422  current = new APIServerInstance(m);
423  current->next = instances;
424  instances = current;
425  return current;
426 }
427 
428 
429 /**
430  * Request the global server API lock.
431  */
433 {
434  lock.request("APIServer::requestLock", 0);
435 }
436 
437 
438 /**
439  * Release the global server API lock.
440  */
442 {
443  lock.release("APIServer::releaseLock", 0);
444 }
const char * ServerManagerText(ServerManager code)
Definition: APIServer.cpp:142
#define ENUM_TEXT(code)
Definition: APIServer.cpp:140
int rxapiCounter
Definition: APIServer.cpp:48
const char * ServerOperationText(ServerOperation code)
Definition: APIServer.cpp:154
@ SERVER_FAILURE
ServiceReturn
@ SERVER_ERROR
@ SERVER_STOPPED
@ MESSAGE_OK
@ SERVER_NOT_STOPPABLE
ServerOperation
@ CLEAR_MACRO_SPACE
@ GET_SESSION_QUEUE_COUNT
@ NEST_SESSION_QUEUE
@ CREATE_NAMED_QUEUE
@ QUERY_MACRO
@ CONNECTION_ACTIVE
@ REGISTER_LOAD_LIBRARY
@ NEXT_MACRO_DESCRIPTOR
@ REGISTER_QUERY_LIBRARY
@ MACRO_SEND_NEXT
@ CLEAR_SESSION_QUEUE
@ REGISTER_DROP_LIBRARY
@ REGISTER_ENTRYPOINT
@ GET_MACRO_DESCRIPTOR
@ DELETE_NAMED_QUEUE
@ GET_MACRO_IMAGE
@ ADD_TO_NAMED_QUEUE
@ PROCESS_CLEANUP
@ REGISTER_DROP
@ REGISTER_LIBRARY
@ REMOVE_MACRO
@ SHUTDOWN_SERVER
@ PULL_FROM_SESSION_QUEUE
@ ADD_TO_SESSION_QUEUE
@ QUERY_NAMED_QUEUE
@ OPEN_NAMED_QUEUE
@ REGISTER_QUERY
@ REORDER_MACRO
@ UPDATE_CALLBACK
@ NEXT_MACRO_IMAGE
@ ADD_MACRO
@ PULL_FROM_NAMED_QUEUE
@ CLEAR_NAMED_QUEUE
@ CLOSE_CONNECTION
@ DELETE_SESSION_QUEUE
@ GET_NAMED_QUEUE_COUNT
@ ITERATE_MACRO_DESCRIPTORS
@ ITERATE_MACROS
@ CREATE_SESSION_QUEUE
@ MACRO_RETRIEVE_NEXT
ServerManager
@ QueueManager
@ RegistrationManager
@ APIManager
@ MacroSpaceManager
#define REXX_API_PORT
@ REXXAPI_VERSION
#define CONCURRENCY_TRACE
Definition: Utilities.hpp:50
std::list< APIServerThread * > terminatedThreads
Definition: APIServer.hpp:78
void requestLock()
Definition: APIServer.cpp:432
SysMutex lock
Definition: APIServer.hpp:74
void initServer()
Definition: APIServer.cpp:53
ServiceReturn shutdown()
Definition: APIServer.cpp:356
void terminateServer()
Definition: APIServer.cpp:69
void releaseLock()
Definition: APIServer.cpp:441
APIServerInstance * getInstance(ServiceMessage &m)
Definition: APIServer.cpp:407
void processMessages(SysServerConnection *connection)
Definition: APIServer.cpp:215
void listenForConnections()
Definition: APIServer.cpp:82
virtual void stop()
Definition: APIServer.cpp:370
void sessionTerminated(APIServerThread *thread)
Definition: APIServer.cpp:110
SysServerStream server
Definition: APIServer.hpp:75
void cleanupProcessResources(ServiceMessage &message)
Definition: APIServer.cpp:344
virtual bool isStoppable()
Definition: APIServer.cpp:383
bool serverActive
Definition: APIServer.hpp:76
APIServerInstance * instances
Definition: APIServer.hpp:77
void dispatch(ServiceMessage &message)
Definition: APIServer.cpp:307
void cleanupTerminatedSessions()
Definition: APIServer.cpp:124
APIServerInstance * next
bool isUser(ServiceMessage &message)
void dispatch(ServiceMessage &message)
virtual bool isStoppable()
void cleanupProcessResources(ServiceMessage &message)
void setExceptionInfo(ErrorCode error, const char *message)
ServerManager messageTarget
void readMessage(SysServerConnection *server)
ServerOperation operation
char userid[MAX_USERID_LENGTH]
char nameArg[NAMESIZE]
void writeResult(SysServerConnection *server)
uintptr_t parameter1
ServiceReturn result
void release(const char *ds, int di)
void request(const char *ds, int di)
bool make(const char *)
SysServerConnection * connect()
void terminate()
static wholenumber_t currentThreadId()
static bool traceConcurrency()
void dbgprintf(const char *format,...)