1 module served.utils.fibermanager;
2 
3 // debug = Fibers;
4 
5 import core.thread;
6 
7 import std.algorithm;
8 import std.experimental.logger;
9 import std.range;
10 
11 import served.utils.memory;
12 
13 version (Have_workspace_d)
14 {
15 	import workspaced.api : Future;
16 
17 	enum hasFuture = true;
18 }
19 else
20 	enum hasFuture = false;
21 
22 public import core.thread : Fiber, Thread;
23 
24 struct FiberManager
25 {
26 	private Fiber[] fibers;
27 
28 	void call()
29 	{
30 		size_t[] toRemove;
31 		foreach (i, fiber; fibers)
32 		{
33 			if (fiber.state == Fiber.State.TERM)
34 				toRemove ~= i;
35 			else
36 				fiber.call();
37 		}
38 		foreach_reverse (i; toRemove)
39 		{
40 			debug (Fibers)
41 				tracef("Releasing fiber %s", cast(void*) fibers[i]);
42 			destroyUnset(fibers[i]);
43 			fibers = fibers.remove(i);
44 		}
45 	}
46 
47 	size_t length() const @property
48 	{
49 		return fibers.length;
50 	}
51 
52 	/// Makes a fiber call alongside other fibers with this manager. This transfers the full memory ownership to the manager.
53 	/// Fibers should no longer be accessed when terminating.
54 	void put(Fiber fiber, string file = __FILE__, int line = __LINE__)
55 	{
56 		debug (Fibers)
57 			tracef("Putting fiber %s in %s:%s", cast(void*) fiber, file, line);
58 		fibers.assumeSafeAppend ~= fiber;
59 	}
60 
61 	/// ditto
62 	void opOpAssign(string op : "~")(Fiber fiber, string file = __FILE__, int line = __LINE__)
63 	{
64 		put(fiber, file, line);
65 	}
66 }
67 
68 private template hasInputRanges(Args...)
69 {
70 	static if (Args.length == 0)
71 		enum hasInputRanges = false;
72 	else static if (isInputRange!(Args[$ - 1]))
73 		enum hasInputRanges = true;
74 	else
75 		enum hasInputRanges = hasInputRanges!(Args[0 .. $ - 1]);
76 }
77 
78 // ridiculously high fiber size (192 KiB per fiber to create), but for parsing big files this is needed to not segfault in libdparse
79 void joinAll(size_t fiberSize = 4096 * 48, Fibers...)(Fibers fibers)
80 {
81 	FiberManager f;
82 	enum anyInputRanges = hasInputRanges!Fibers;
83 	static if (anyInputRanges)
84 	{
85 		Fiber[] converted;
86 		converted.reserve(Fibers.length);
87 		void addFiber(Fiber fiber)
88 		{
89 			converted ~= fiber;
90 		}
91 	}
92 	else
93 	{
94 		int i;
95 		Fiber[Fibers.length] converted;
96 
97 		void addFiber(Fiber fiber)
98 		{
99 			converted[i++] = fiber;
100 		}
101 	}
102 
103 	foreach (fiber; fibers)
104 	{
105 		static if (isInputRange!(typeof(fiber)))
106 		{
107 			foreach (fib; fiber)
108 			{
109 				static if (is(typeof(fib) : Fiber))
110 					addFiber(fib);
111 				else static if (hasFuture && is(typeof(fib) : Future!T, T))
112 					addFiber(new Fiber(&fib.getYield, fiberSize));
113 				else
114 					addFiber(new Fiber(fib, fiberSize));
115 			}
116 		}
117 		else
118 		{
119 			static if (is(typeof(fiber) : Fiber))
120 				addFiber(fiber);
121 			else static if (hasFuture && is(typeof(fiber) : Future!T, T))
122 				addFiber(new Fiber(&fiber.getYield, fiberSize));
123 			else
124 				addFiber(new Fiber(fiber, fiberSize));
125 		}
126 	}
127 	f.fibers = converted[];
128 	while (f.length)
129 	{
130 		f.call();
131 		Fiber.yield();
132 	}
133 }