1 module served.utils.fibermanager; 2 3 // debug = Fibers; 4 5 import core.thread; 6 7 import std.algorithm; 8 import std.experimental.logger; 9 import std.range; 10 11 import served.utils.memory; 12 13 version (Have_workspace_d) 14 { 15 import workspaced.api : Future; 16 17 enum hasFuture = true; 18 } 19 else 20 enum hasFuture = false; 21 22 public import core.thread : Fiber, Thread; 23 24 struct FiberManager 25 { 26 private Fiber[] fibers; 27 28 void call() 29 { 30 size_t[] toRemove; 31 foreach (i, fiber; fibers) 32 { 33 if (fiber.state == Fiber.State.TERM) 34 toRemove ~= i; 35 else 36 fiber.call(); 37 } 38 foreach_reverse (i; toRemove) 39 { 40 debug (Fibers) 41 tracef("Releasing fiber %s", cast(void*) fibers[i]); 42 destroyUnset(fibers[i]); 43 fibers = fibers.remove(i); 44 } 45 } 46 47 size_t length() const @property 48 { 49 return fibers.length; 50 } 51 52 /// Makes a fiber call alongside other fibers with this manager. This transfers the full memory ownership to the manager. 53 /// Fibers should no longer be accessed when terminating. 54 void put(Fiber fiber, string file = __FILE__, int line = __LINE__) 55 { 56 debug (Fibers) 57 tracef("Putting fiber %s in %s:%s", cast(void*) fiber, file, line); 58 fibers.assumeSafeAppend ~= fiber; 59 } 60 61 /// ditto 62 void opOpAssign(string op : "~")(Fiber fiber, string file = __FILE__, int line = __LINE__) 63 { 64 put(fiber, file, line); 65 } 66 } 67 68 private template hasInputRanges(Args...) 69 { 70 static if (Args.length == 0) 71 enum hasInputRanges = false; 72 else static if (isInputRange!(Args[$ - 1])) 73 enum hasInputRanges = true; 74 else 75 enum hasInputRanges = hasInputRanges!(Args[0 .. $ - 1]); 76 } 77 78 // ridiculously high fiber size (192 KiB per fiber to create), but for parsing big files this is needed to not segfault in libdparse 79 void joinAll(size_t fiberSize = 4096 * 48, Fibers...)(Fibers fibers) 80 { 81 FiberManager f; 82 enum anyInputRanges = hasInputRanges!Fibers; 83 static if (anyInputRanges) 84 { 85 Fiber[] converted; 86 converted.reserve(Fibers.length); 87 void addFiber(Fiber fiber) 88 { 89 converted ~= fiber; 90 } 91 } 92 else 93 { 94 int i; 95 Fiber[Fibers.length] converted; 96 97 void addFiber(Fiber fiber) 98 { 99 converted[i++] = fiber; 100 } 101 } 102 103 foreach (fiber; fibers) 104 { 105 static if (isInputRange!(typeof(fiber))) 106 { 107 foreach (fib; fiber) 108 { 109 static if (is(typeof(fib) : Fiber)) 110 addFiber(fib); 111 else static if (hasFuture && is(typeof(fib) : Future!T, T)) 112 addFiber(new Fiber(&fib.getYield, fiberSize)); 113 else 114 addFiber(new Fiber(fib, fiberSize)); 115 } 116 } 117 else 118 { 119 static if (is(typeof(fiber) : Fiber)) 120 addFiber(fiber); 121 else static if (hasFuture && is(typeof(fiber) : Future!T, T)) 122 addFiber(new Fiber(&fiber.getYield, fiberSize)); 123 else 124 addFiber(new Fiber(fiber, fiberSize)); 125 } 126 } 127 f.fibers = converted[]; 128 while (f.length) 129 { 130 f.call(); 131 Fiber.yield(); 132 } 133 }