/* * NMSTL, the Networking, Messaging, Servers, and Threading Library for C++ * Copyright (c) 2002 Massachusetts Institute of Technology * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include using namespace nmstl; using namespace std; // Input/output queue of subcomponent tqueue< pair > input; tqueue< pair > output; // Counter: unique ID for request static int counter = 1000; // Map of request ID -> result object typedef map > result_map; static result_map pending_results; bool my_event_loop(); // Sends a request to subcomponent and returns a result object result handle(string in) { int id = counter++; result ret = pending_results[id] = result( wrap(&my_event_loop) ); input.push(pair(id, in)); return ret; } // Waits for an event to come in; returns false if no more events will // *ever* come in. bool my_event_loop() { pair val; if (output.wait(val)) { int id = val.first; string s = val.second; result_map::iterator rptr = pending_results.find(id); if (rptr == pending_results.end()) { WARN << "Got response for a result I don't know about!"; } else { // N.B.: same as pending_results[id].post(s); rptr->second.post(s); // Now forget about that ID pending_results.erase(rptr); } return true; } else { cout << "Output queue has been closed." << endl; return false; } } class uppercase_thread : public thread { public: void run() { pair one; cout << *this << " beginning" << endl; while (input.wait(one)) { string s = one.second; cout << *this << " processing " << one.first << ":" << s << "..." << endl; for (unsigned int i = 0; i < s.length(); ++i) s[i] = toupper(s[i]); thread::sleep(ntime::msecs(200 * s.length())); cout << *this << " done processing " << s << "." << endl; output.push(pair(one.first, s)); } cout << "Input queue has been closed." << endl; output.close(); } }; void my_callback(result* out) { cout << "Callback: " << **out << endl; } int main() { uppercase_thread thr; thr.start(); // EXAMPLE 1 { cout << "Synchronous usage:" << endl; cout << handle("ABCDE").wait() << endl; cout << handle("QWERTYUIOP").wait() << endl; cout << handle("ZZZ").wait() << endl; } // EXAMPLE 2 { cout << "Synchronous usage, out of order:" << endl; result one = handle("ONE"); result two = handle("TWO"); cout << two.wait() << endl; cout << one.wait() << endl; } // EXAMPLE 3 { cout << "Asynchronous usage:" << endl; result out1 = handle("ABCDE").add(wrap(&my_callback)); result out2 = handle("QWERTYUIOP").add(wrap(&my_callback)); result out3 = handle("ZZZ").add(wrap(&my_callback)); input.close(); while (my_event_loop()) ; } // Close off thread thr.join(); }