Skip to content

Commit bd1b369

Browse files
committed
New WorkerThread.js apporoach using addChildApp
1 parent e322cd8 commit bd1b369

File tree

3 files changed

+93
-12
lines changed

3 files changed

+93
-12
lines changed

examples/WorkerThreads.js

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,61 @@
1-
/* This example spawns two worker threads, each with their own
2-
* server listening to the same port (Linux feature). */
1+
/* This example shows two different approaches to multi-core load balancing.
2+
* The first approach (the oldest) requires Linux and will only work on Linux.
3+
* This approach listens to port 4000 on all CPUs. That's it. That's all you do.
4+
* Listening to the same port from many worker threads will work on Linux.
5+
* The second approach will work on all platforms; you set up a main acceptorApp and register all child apps
6+
* (worker apps) with it. The acceptorApp will listen to port 9001 and move sockets in round-robin fashion to
7+
* the registered child apps.
8+
* Note that, in this example we only create 2 worker threads. Ideally you should create as many as there are CPUs
9+
* in your system. But by only creating 2 here, it is simple to see the perf. gain on a system of 4 cores, as you can then
10+
* run the client side on the remaining 2 cores without interfering with the server side. */
311

412
const uWS = require('../dist/uws.js');
513
const port = 9001;
6-
const { Worker, isMainThread, threadId } = require('worker_threads');
14+
const { Worker, isMainThread, threadId, parentPort } = require('worker_threads');
715
const os = require('os');
816

917
if (isMainThread) {
18+
19+
/* The acceptorApp only listens, but must be SSL if worker apps are SSL and likewise opposite */
20+
const acceptorApp = uWS./*SSL*/App({
21+
key_file_name: 'misc/key.pem',
22+
cert_file_name: 'misc/cert.pem',
23+
passphrase: '1234'
24+
}).listen(port, (token) => {
25+
if (token) {
26+
console.log('Listening to port ' + port + ' from thread ' + threadId + ' as main acceptor');
27+
} else {
28+
console.log('Failed to listen to port ' + port + ' from thread ' + threadId);
29+
}
30+
});
31+
1032
/* Main thread loops over all CPUs */
1133
/* In this case we only spawn two (hardcoded) */
1234
/*os.cpus()*/[0, 1].forEach(() => {
35+
1336
/* Spawn a new thread running this source file */
14-
new Worker(__filename);
37+
new Worker(__filename).on("message", (workerAppDescriptor) => {
38+
acceptorApp.addChildAppDescriptor(workerAppDescriptor);
39+
});
1540
});
1641

1742
/* I guess main thread joins by default? */
1843
} else {
1944
/* Here we are inside a worker thread */
20-
const app = uWS.SSLApp({
45+
const app = uWS./*SSL*/App({
2146
key_file_name: 'misc/key.pem',
2247
cert_file_name: 'misc/cert.pem',
2348
passphrase: '1234'
2449
}).get('/*', (res, req) => {
2550
res.end('Hello Worker!');
26-
}).listen(port, (token) => {
27-
if (token) {
28-
console.log('Listening to port ' + port + ' from thread ' + threadId);
29-
} else {
30-
console.log('Failed to listen to port ' + port + ' from thread ' + threadId);
31-
}
51+
}).listen(4000, (token) => {
52+
if (token) {
53+
console.log('Listening to port ' + 4000 + ' from thread ' + threadId);
54+
} else {
55+
console.log('Failed to listen to port ' + 4000 + ' from thread ' + threadId);
56+
}
3257
});
58+
59+
/* The worker sends back its descriptor to the main acceptor */
60+
parentPort.postMessage(app.getDescriptor());
3361
}

src/AppWrapper.h

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,54 @@ std::pair<uWS::SocketContextOptions, bool> readOptionsObject(const FunctionCallb
713713
return {options, true};
714714
}
715715

716+
template <typename APP>
717+
void uWS_App_addChildApp(const FunctionCallbackInfo<Value> &args) {
718+
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
719+
720+
Isolate *isolate = args.GetIsolate();
721+
722+
double descriptor = args[0]->NumberValue(isolate->GetCurrentContext()).ToChecked();
723+
724+
725+
APP *receivingApp;// = (APP *) args[0]->ToObject(isolate->GetCurrentContext()).ToLocalChecked()->GetAlignedPointerFromInternalField(0);
726+
727+
memcpy(&receivingApp, &descriptor, sizeof(receivingApp));
728+
729+
/* Todo: check the class type of args[0] must match class type of args.Holder() */
730+
//if (args[0])
731+
732+
//std::cout << "addChildApp: " << receivingApp << std::endl;
733+
734+
app->addChildApp(receivingApp);
735+
736+
args.GetReturnValue().Set(args.Holder());
737+
}
738+
739+
template <typename APP>
740+
void uWS_App_getDescriptor(const FunctionCallbackInfo<Value> &args) {
741+
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
742+
743+
Isolate *isolate = args.GetIsolate();
744+
745+
static_assert(sizeof(double) >= sizeof(app));
746+
747+
//static thread_local std::unordered_set<UniquePersistent<Object>> persistentApps;
748+
749+
UniquePersistent<Object> *persistentApp = new UniquePersistent<Object>;
750+
persistentApp->Reset(args.GetIsolate(), args.Holder());
751+
752+
//persistentApps.emplace(persistentApp);
753+
754+
double descriptor = 0;
755+
memcpy(&descriptor, &app, sizeof(app));
756+
757+
//std::cout << "getDescriptor: " << app << std::endl;
758+
759+
//std::cout << "Loop: " << app->getLoop() << std::endl;
760+
761+
args.GetReturnValue().Set(Number::New(isolate, descriptor));
762+
}
763+
716764
template <typename APP>
717765
void uWS_App_addServerName(const FunctionCallbackInfo<Value> &args) {
718766
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
@@ -920,6 +968,11 @@ void uWS_App(const FunctionCallbackInfo<Value> &args) {
920968
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "listen_unix", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_listen_unix<APP>, args.Data()));
921969
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "filter", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_filter<APP>, args.Data()));
922970

971+
/* load balancing */
972+
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "addChildAppDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_addChildApp<APP>, args.Data()));
973+
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_getDescriptor<APP>, args.Data()));
974+
975+
923976
/* ws, listen */
924977
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "ws", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_ws<APP>, args.Data()));
925978
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "publish", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_publish<APP>, args.Data()));

0 commit comments

Comments
 (0)