1 module served.utils.async; 2 3 import core.sync.mutex : Mutex; 4 import core.time : Duration; 5 6 import served.utils.fibermanager; 7 8 import std.datetime.stopwatch : msecs, StopWatch; 9 import std.experimental.logger; 10 11 __gshared void delegate(void delegate(), int pages, string file, int line) spawnFiberImpl; 12 __gshared int timeoutID; 13 __gshared Timeout[] timeouts; 14 __gshared Mutex timeoutsMutex; 15 16 void spawnFiber(void delegate() cb, int pages = 20, string file = __FILE__, int line = __LINE__) 17 { 18 if (spawnFiberImpl) 19 spawnFiberImpl(cb, pages, file, line); 20 else 21 setImmediate(cb); 22 } 23 24 // Called at most 100x per second 25 void parallelMain() 26 { 27 timeoutsMutex = new Mutex; 28 void delegate()[32] callsBuf; 29 void delegate()[] calls; 30 while (true) 31 { 32 synchronized (timeoutsMutex) 33 foreach_reverse (i, ref timeout; timeouts) 34 { 35 if (timeout.sw.peek >= timeout.timeout) 36 { 37 timeout.sw.stop(); 38 trace("Calling timeout"); 39 callsBuf[calls.length] = timeout.callback; 40 calls = callsBuf[0 .. calls.length + 1]; 41 if (timeouts.length > 1) 42 timeouts[i] = timeouts[$ - 1]; 43 timeouts.length--; 44 45 if (calls.length >= callsBuf.length) 46 break; 47 } 48 } 49 50 foreach (call; calls) 51 call(); 52 53 callsBuf[] = null; 54 calls = null; 55 Fiber.yield(); 56 } 57 } 58 59 struct Timeout 60 { 61 StopWatch sw; 62 Duration timeout; 63 void delegate() callback; 64 int id; 65 } 66 67 int setTimeout(void delegate() callback, int ms) 68 { 69 return setTimeout(callback, ms.msecs); 70 } 71 72 void setImmediate(void delegate() callback) 73 { 74 setTimeout(callback, 0); 75 } 76 77 int setTimeout(void delegate() callback, Duration timeout) 78 { 79 trace("Setting timeout for ", timeout); 80 Timeout to; 81 to.timeout = timeout; 82 to.callback = callback; 83 to.sw.start(); 84 synchronized (timeoutsMutex) 85 { 86 to.id = ++timeoutID; 87 timeouts ~= to; 88 } 89 return to.id; 90 } 91 92 void clearTimeout(int id) 93 { 94 synchronized (timeoutsMutex) 95 foreach_reverse (i, ref timeout; timeouts) 96 { 97 if (timeout.id == id) 98 { 99 timeout.sw.stop(); 100 if (timeouts.length > 1) 101 timeouts[i] = timeouts[$ - 1]; 102 timeouts.length--; 103 return; 104 } 105 } 106 }