introduce EsWorkQueue

This commit is contained in:
nakst 2021-09-17 18:58:03 +01:00
parent be09af9014
commit 069eb7754f
8 changed files with 84 additions and 59 deletions

View File

@ -247,12 +247,6 @@ Array<String> bookmarks;
Array<FolderViewSettingsEntry> folderViewSettings;
HashStore<uint64_t, Thumbnail> thumbnailCache;
// Non-blocking task thread.
EsHandle nonBlockingTaskWorkAvailable;
EsMutex nonBlockingTaskMutex;
Array<Task *> nonBlockingTasks;
// Styles.
const EsStyle styleFolderView = {
@ -346,41 +340,20 @@ void BlockingTaskQueue(Instance *instance, Task task) {
}
}
void NonBlockingTaskThread(EsGeneric) {
while (true) {
EsWait(&nonBlockingTaskWorkAvailable, 1, ES_WAIT_NO_TIMEOUT);
while (true) {
EsMutexAcquire(&nonBlockingTaskMutex);
if (!nonBlockingTasks.Length()) {
EsMutexRelease(&nonBlockingTaskMutex);
break;
}
Task *task = nonBlockingTasks[0];
nonBlockingTasks.Delete(0);
EsMutexRelease(&nonBlockingTaskMutex);
task->callback(nullptr, task);
EsMessage m = { MESSAGE_NON_BLOCKING_TASK_COMPLETE };
m.user.context2.p = task;
EsMessagePost(nullptr, &m);
}
}
void NonBlockingTaskWrapper(EsGeneric _task) {
Task *task = (Task *) _task.p;
task->callback(nullptr, task);
EsMessage m = { MESSAGE_NON_BLOCKING_TASK_COMPLETE };
m.user.context2.p = task;
EsMessagePost(nullptr, &m);
}
void NonBlockingTaskQueue(Task _task) {
// NOTE We can't store instances in tasks on the non-blocking queue thread,
// because the instances might be destroyed while the task is in progress!
Task *task = (Task *) EsHeapAllocate(sizeof(Task), false);
EsMemoryCopy(task, &_task, sizeof(Task));
EsMutexAcquire(&nonBlockingTaskMutex);
nonBlockingTasks.Add(task);
EsMutexRelease(&nonBlockingTaskMutex);
EsEventSet(nonBlockingTaskWorkAvailable);
EsWorkQueue(NonBlockingTaskWrapper, task);
}
void NonBlockingTaskComplete(EsMessage *message) {
@ -515,15 +488,6 @@ void _start() {
DriveAdd(prefix, prefixBytes);
}, 0);
// Start the non-blocking task threads.
nonBlockingTaskWorkAvailable = EsEventCreate(true /* autoReset */);
EsThreadInformation _nonBlockingTaskThread = {};
for (uintptr_t i = 0; i < EsSystemGetOptimalWorkQueueThreadCount(); i++) {
EsThreadCreate(NonBlockingTaskThread, &_nonBlockingTaskThread, nullptr);
}
// Process messages.
while (true) {
@ -557,7 +521,6 @@ void _start() {
}
EsAssert(!instances.Length());
EsHandleClose(nonBlockingTaskWorkAvailable);
EsHeapFree(fileTypesBuffer.out);
bookmarks.Free();
@ -568,7 +531,6 @@ void _start() {
knownFileTypes.Free();
knownFileTypesByExtension.Free();
loadedFolders.Free();
nonBlockingTasks.Free();
thumbnailCache.Free();
#endif
} else if (message->type == ES_MSG_REGISTER_FILE_SYSTEM) {

View File

@ -94,7 +94,7 @@ struct ThreadLocalStorage {
// This must be the first field.
ThreadLocalStorage *self;
uint64_t id;
EsObjectID id;
};
struct MountPoint : EsMountPoint {
@ -109,6 +109,11 @@ struct Timer {
EsGeneric argument;
};
struct Work {
EsWorkCallback callback;
EsGeneric context;
};
struct {
Array<EsSystemConfigurationGroup> systemConfigurationGroups;
EsMutex systemConfigurationMutex;
@ -136,6 +141,10 @@ struct {
uintptr_t performanceTimerStackCount;
ThreadLocalStorage firstThreadLocalStorage;
EsHandle workAvailable;
EsMutex workMutex;
Array<Work> workQueue;
} api;
ptrdiff_t tlsStorageOffset;
@ -866,6 +875,9 @@ EsMessage *EsMessageReceive() {
gui.allWindows.Free();
calculator.Free();
HashTableFree(&gui.keyboardShortcutNames, false);
EsHandleClose(api.workAvailable); // TODO Waiting for all work to finish.
EsAssert(!api.workQueue.Length());
api.workQueue.Free();
MemoryLeakDetectorCheckpoint(&heap);
EsPrint("ES_MSG_APPLICATION_EXIT - Heap allocation count: %d (%d from malloc).\n", heap.allocationsCount, mallocCount);
#endif
@ -1259,7 +1271,7 @@ uintptr_t EsSystemGetOptimalWorkQueueThreadCount() {
void ThreadInitialise(ThreadLocalStorage *local) {
EsMemoryZero(local, sizeof(ThreadLocalStorage));
local->id = EsSyscall(ES_SYSCALL_THREAD_GET_ID, ES_CURRENT_THREAD, 0, 0, 0);
EsSyscall(ES_SYSCALL_THREAD_GET_ID, ES_CURRENT_THREAD, (uintptr_t) &local->id, 0, 0);
local->self = local;
EsSyscall(ES_SYSCALL_PROCESS_SET_TLS, (uintptr_t) local - tlsStorageOffset, 0, 0, 0);
}
@ -1881,6 +1893,44 @@ void EsTimerCancel(EsTimer id) {
EsMutexRelease(&api.timersMutex);
}
void WorkThread(EsGeneric) {
while (true) {
EsWait(&api.workAvailable, 1, ES_WAIT_NO_TIMEOUT);
while (true) {
EsMutexAcquire(&api.workMutex);
if (api.workQueue.Length()) {
Work work = api.workQueue[0];
api.workQueue.Delete(0);
EsMutexRelease(&api.workMutex);
work.callback(work.context);
} else {
EsMutexRelease(&api.workMutex);
}
}
}
}
void EsWorkQueue(EsWorkCallback callback, EsGeneric context) {
EsMutexAcquire(&api.workMutex);
if (!api.workAvailable) {
api.workAvailable = EsEventCreate(true /* autoReset */);
EsThreadInformation thread = {};
for (uintptr_t i = 0; i < EsSystemGetOptimalWorkQueueThreadCount(); i++) {
EsThreadCreate(WorkThread, &thread, nullptr);
EsHandleClose(thread.handle);
}
}
Work work = { callback, context };
api.workQueue.Add(work);
EsMutexRelease(&api.workMutex);
EsEventSet(api.workAvailable);
}
#ifndef ENABLE_POSIX_SUBSYSTEM
void EsPOSIXInitialise(int *, char ***) {
while (true) {

View File

@ -1905,6 +1905,7 @@ function_pointer void EsListViewEnumerateVisibleItemsCallback(EsListView *view,
function_pointer void EsFontEnumerationCallback(const EsFontInformation *information, EsGeneric context);
function_pointer void EsUserTaskCallback(EsUserTask *task, EsGeneric data);
function_pointer bool EsFileCopyCallback(EsFileOffset bytesCopied, EsFileOffset totalBytes, EsGeneric data); // Return false to cancel.
function_pointer void EsWorkCallback(EsGeneric context);
// System.
@ -1985,16 +1986,19 @@ function EsError EsDeviceControl(EsHandle handle, EsDeviceControlType type, void
function EsError EsProcessCreate(EsProcessCreationArguments *arguments, EsProcessInformation *information);
function int EsProcessGetExitStatus(EsHandle process);
function EsObjectID EsProcessGetID(EsHandle process);
function EsObjectID EsProcessGetID(EsHandle process);
function void EsProcessGetState(EsHandle process, EsProcessState *state);
function EsHandle EsProcessOpen(EsObjectID pid);
function void EsProcessPause(EsHandle process, bool resume);
function void EsProcessTerminate(EsHandle process, int status);
function void EsProcessTerminateCurrent();
function EsError EsThreadCreate(EsThreadEntryCallback entryFunction, EsThreadInformation *information, EsGeneric argument);
function EsObjectID EsThreadGetID(EsHandle thread); // TODO Make this 64-bit.
function EsObjectID EsThreadGetID(EsHandle thread);
function void EsThreadTerminate(EsHandle thread);
function void EsWorkQueue(EsWorkCallback callback, EsGeneric context);
// Memory.
function void *EsArenaAllocate(EsArena *arena, bool zero); // Not thread-safe.

View File

@ -257,7 +257,9 @@ long EsPOSIXSystemCall(long n, long a1, long a2, long a3, long a4, long a5, long
case SYS_getpid: {
// Run the system call directly, so that the kernel can handle the vfork()'d case.
returnValue = EsSyscall(ES_SYSCALL_THREAD_GET_ID, ES_CURRENT_PROCESS, 0, 0, 0);
EsObjectID id;
EsSyscall(ES_SYSCALL_THREAD_GET_ID, ES_CURRENT_PROCESS, (uintptr_t) &id, 0, 0);
returnValue = id;
} break;
case SYS_gettid: {
@ -484,7 +486,7 @@ long EsPOSIXSystemCall(long n, long a1, long a2, long a3, long a4, long a5, long
case SYS_prlimit64: {
// You can't access other process's resources.
if (a1 && a1 != (long) EsSyscall(ES_SYSCALL_THREAD_GET_ID, ES_CURRENT_PROCESS, 0, 0, 0)) {
if (a1 && a1 != (long) EsProcessGetID(ES_CURRENT_PROCESS)) {
returnValue = -EPERM;
break;
}

View File

@ -436,16 +436,20 @@ void EsProcessPause(EsHandle process, bool resume) {
EsSyscall(ES_SYSCALL_PROCESS_PAUSE, process, resume, 0, 0);
}
uint64_t EsThreadGetID(EsHandle thread) {
EsObjectID EsThreadGetID(EsHandle thread) {
if (thread == ES_CURRENT_THREAD) {
return GetThreadLocalStorage()->id;
} else {
return EsSyscall(ES_SYSCALL_THREAD_GET_ID, thread, 0, 0, 0);
EsObjectID id;
EsSyscall(ES_SYSCALL_THREAD_GET_ID, thread, (uintptr_t) &id, 0, 0);
return id;
}
}
uintptr_t EsProcessGetID(EsHandle process) {
return EsSyscall(ES_SYSCALL_THREAD_GET_ID, process, 0, 0, 0);
EsObjectID EsProcessGetID(EsHandle process) {
EsObjectID id;
EsSyscall(ES_SYSCALL_THREAD_GET_ID, process, (uintptr_t) &id, 0, 0);
return id;
}
ptrdiff_t EsDirectoryEnumerateChildrenFromHandle(EsHandle directory, EsDirectoryChild *buffer, size_t size) {

View File

@ -164,6 +164,8 @@ EsError KLoadELF(KNode *node, KLoadedExecutable *executable) {
#ifdef ARCH_X86_64
uint64_t name = CalculateCRC64(EsLiteral("$Executables/x86_64"));
#else
#error Unimplemented.
#endif
BundleFile *files = (BundleFile *) ((BundleHeader *) header.mapAddress + 1);

View File

@ -1253,20 +1253,20 @@ SYSCALL_IMPLEMENT(ES_SYSCALL_THREAD_GET_ID) {
SYSCALL_HANDLE_2(argument0, KERNEL_OBJECT_THREAD | KERNEL_OBJECT_PROCESS, object);
if (object.type == KERNEL_OBJECT_THREAD) {
SYSCALL_RETURN(((Thread *) object.object)->id, false);
SYSCALL_WRITE(argument1, &((Thread *) object.object)->id, sizeof(EsObjectID));
} else if (object.type == KERNEL_OBJECT_PROCESS) {
Process *process = (Process *) object.object;
EsObjectID id = process->id;
#ifdef ENABLE_POSIX_SUBSYSTEM
if (currentThread->posixData && currentThread->posixData->forkProcess) {
SYSCALL_RETURN(currentThread->posixData->forkProcess->id, false);
id = currentThread->posixData->forkProcess->id;
}
#endif
SYSCALL_RETURN(process->id, false);
SYSCALL_WRITE(argument1, &id, sizeof(EsObjectID));
}
KernelPanic("ES_SYSCALL_THREAD_GET_ID - Unhandled case.\n");
SYSCALL_RETURN(ES_SUCCESS, false);
}

View File

@ -453,3 +453,4 @@ EsIconDisplaySetIcon=451
EsFontDatabaseLookupByName=452
_EsWindowGetHandle=453
_EsUISetFont=454
EsWorkQueue=455