1 module workspaced.future; 2 3 import core.time; 4 5 import std.parallelism; 6 import std.traits : isCallable; 7 8 class Future(T) 9 { 10 import core.thread : Fiber, Thread; 11 12 static if (!is(T == void)) 13 T value; 14 Throwable exception; 15 bool has; 16 void delegate() _onDone; 17 private Thread _worker; 18 19 /// Sets the onDone callback if no value has been set yet or calls immediately if the value has already been set or was set during setting the callback. 20 /// Crashes with an assert error if attempting to override an existing callback (i.e. calling this function on the same object twice). 21 void onDone(void delegate() callback) @property 22 { 23 assert(!_onDone); 24 if (has) 25 callback(); 26 else 27 { 28 bool called; 29 _onDone = { called = true; callback(); }; 30 if (has && !called) 31 callback(); 32 } 33 } 34 35 static if (is(T == void)) 36 static Future!void finished() 37 { 38 auto ret = new typeof(return); 39 ret.has = true; 40 return ret; 41 } 42 else 43 static Future!T fromResult(T value) 44 { 45 auto ret = new typeof(return); 46 ret.value = value; 47 ret.has = true; 48 return ret; 49 } 50 51 static Future!T async(T delegate() cb) 52 { 53 auto ret = new typeof(return); 54 ret._worker = new Thread({ 55 try 56 { 57 static if (is(T == void)) 58 { 59 cb(); 60 ret.finish(); 61 } 62 else 63 ret.finish(cb()); 64 } 65 catch (Throwable t) 66 { 67 ret.error(t); 68 } 69 }).start(); 70 return ret; 71 } 72 73 static Future!T fromError(T)(Throwable error) 74 { 75 auto ret = new typeof(return); 76 ret.error = error; 77 ret.has = true; 78 return ret; 79 } 80 81 static if (is(T == void)) 82 void finish() 83 { 84 assert(!has); 85 has = true; 86 if (_onDone) 87 _onDone(); 88 } 89 else 90 void finish(T value) 91 { 92 assert(!has); 93 this.value = value; 94 has = true; 95 if (_onDone) 96 _onDone(); 97 } 98 99 void error(Throwable t) 100 { 101 assert(!has); 102 exception = t; 103 has = true; 104 if (_onDone) 105 _onDone(); 106 } 107 108 /// Waits for the result of this future using Thread.sleep 109 T getBlocking(alias sleepDur = 1.msecs)() 110 { 111 while (!has) 112 Thread.sleep(sleepDur); 113 if (_worker) 114 { 115 _worker.join(); 116 _worker = null; 117 } 118 if (exception) 119 throw exception; 120 static if (!is(T == void)) 121 return value; 122 } 123 124 /// Waits for the result of this future using Fiber.yield 125 T getYield() 126 { 127 assert(Fiber.getThis() !is null, 128 "Attempted to getYield without being in a Fiber context"); 129 130 while (!has) 131 Fiber.yield(); 132 if (_worker) 133 { 134 _worker.join(); 135 _worker = null; 136 } 137 if (exception) 138 throw exception; 139 static if (!is(T == void)) 140 return value; 141 } 142 } 143 144 enum string gthreadsAsyncProxy(string call) = `auto __futureRet = new typeof(return); 145 gthreads.create({ 146 mixin(traceTask); 147 try 148 { 149 __futureRet.finish(` ~ call ~ `); 150 } 151 catch (Throwable t) 152 { 153 __futureRet.error(t); 154 } 155 }); 156 return __futureRet; 157 `; 158 159 void create(T)(TaskPool pool, T fun) if (isCallable!T) 160 { 161 pool.put(task(fun)); 162 }