1 module served.utils.async; 2 3 import core.sync.mutex : Mutex; 4 import core.time : Duration; 5 6 import served.utils.fibermanager; 7 8 import std.datetime.stopwatch : msecs, StopWatch; 9 import std.experimental.logger; 10 11 __gshared void delegate(void delegate(), int pages, string file, int line) spawnFiberImpl; 12 __gshared int defaultFiberPages = 20; 13 __gshared int timeoutID; 14 __gshared Timeout[] timeouts; 15 __gshared Mutex timeoutsMutex; 16 17 void spawnFiber(void delegate() cb, string file = __FILE__, int line = __LINE__) 18 { 19 spawnFiber(cb, defaultFiberPages, file, line); 20 } 21 22 void spawnFiber(void delegate() cb, int pages, string file = __FILE__, int line = __LINE__) 23 { 24 if (spawnFiberImpl) 25 spawnFiberImpl(cb, pages, file, line); 26 else 27 setImmediate(cb); 28 } 29 30 // Called at most 100x per second 31 void parallelMain() 32 { 33 timeoutsMutex = new Mutex; 34 void delegate()[32] callsBuf; 35 void delegate()[] calls; 36 while (true) 37 { 38 synchronized (timeoutsMutex) 39 foreach_reverse (i, ref timeout; timeouts) 40 { 41 if (timeout.sw.peek >= timeout.timeout) 42 { 43 timeout.sw.stop(); 44 trace("Calling timeout"); 45 callsBuf[calls.length] = timeout.callback; 46 calls = callsBuf[0 .. calls.length + 1]; 47 if (timeouts.length > 1) 48 timeouts[i] = timeouts[$ - 1]; 49 timeouts.length--; 50 51 if (calls.length >= callsBuf.length) 52 break; 53 } 54 } 55 56 foreach (call; calls) 57 call(); 58 59 callsBuf[] = null; 60 calls = null; 61 Fiber.yield(); 62 } 63 } 64 65 struct Timeout 66 { 67 StopWatch sw; 68 Duration timeout; 69 void delegate() callback; 70 int id; 71 } 72 73 int setTimeout(void delegate() callback, int ms) 74 { 75 return setTimeout(callback, ms.msecs); 76 } 77 78 void setImmediate(void delegate() callback) 79 { 80 setTimeout(callback, 0); 81 } 82 83 int setTimeout(void delegate() callback, Duration timeout) 84 { 85 trace("Setting timeout for ", timeout); 86 Timeout to; 87 to.timeout = timeout; 88 to.callback = callback; 89 to.sw.start(); 90 synchronized (timeoutsMutex) 91 { 92 to.id = ++timeoutID; 93 timeouts ~= to; 94 } 95 return to.id; 96 } 97 98 void clearTimeout(int id) 99 { 100 synchronized (timeoutsMutex) 101 foreach_reverse (i, ref timeout; timeouts) 102 { 103 if (timeout.id == id) 104 { 105 timeout.sw.stop(); 106 if (timeouts.length > 1) 107 timeouts[i] = timeouts[$ - 1]; 108 timeouts.length--; 109 return; 110 } 111 } 112 }