1 module served.utils.async;
2 
3 import core.sync.mutex : Mutex;
4 import core.time : Duration;
5 
6 import served.utils.fibermanager;
7 
8 import std.datetime.stopwatch : msecs, StopWatch;
9 import std.experimental.logger;
10 
11 __gshared void delegate(void delegate(), int pages, string file, int line) spawnFiberImpl;
12 __gshared int timeoutID;
13 __gshared Timeout[] timeouts;
14 __gshared Mutex timeoutsMutex;
15 
16 void spawnFiber(void delegate() cb, int pages = 20, string file = __FILE__, int line = __LINE__)
17 {
18 	if (spawnFiberImpl)
19 		spawnFiberImpl(cb, pages, file, line);
20 	else
21 		setImmediate(cb);
22 }
23 
24 // Called at most 100x per second
25 void parallelMain()
26 {
27 	timeoutsMutex = new Mutex;
28 	void delegate()[32] callsBuf;
29 	void delegate()[] calls;
30 	while (true)
31 	{
32 		synchronized (timeoutsMutex)
33 			foreach_reverse (i, ref timeout; timeouts)
34 			{
35 				if (timeout.sw.peek >= timeout.timeout)
36 				{
37 					timeout.sw.stop();
38 					trace("Calling timeout");
39 					callsBuf[calls.length] = timeout.callback;
40 					calls = callsBuf[0 .. calls.length + 1];
41 					if (timeouts.length > 1)
42 						timeouts[i] = timeouts[$ - 1];
43 					timeouts.length--;
44 
45 					if (calls.length >= callsBuf.length)
46 						break;
47 				}
48 			}
49 
50 		foreach (call; calls)
51 			call();
52 
53 		callsBuf[] = null;
54 		calls = null;
55 		Fiber.yield();
56 	}
57 }
58 
59 struct Timeout
60 {
61 	StopWatch sw;
62 	Duration timeout;
63 	void delegate() callback;
64 	int id;
65 }
66 
67 int setTimeout(void delegate() callback, int ms)
68 {
69 	return setTimeout(callback, ms.msecs);
70 }
71 
72 void setImmediate(void delegate() callback)
73 {
74 	setTimeout(callback, 0);
75 }
76 
77 int setTimeout(void delegate() callback, Duration timeout)
78 {
79 	trace("Setting timeout for ", timeout);
80 	Timeout to;
81 	to.timeout = timeout;
82 	to.callback = callback;
83 	to.sw.start();
84 	synchronized (timeoutsMutex)
85 	{
86 		to.id = ++timeoutID;
87 		timeouts ~= to;
88 	}
89 	return to.id;
90 }
91 
92 void clearTimeout(int id)
93 {
94 	synchronized (timeoutsMutex)
95 		foreach_reverse (i, ref timeout; timeouts)
96 		{
97 			if (timeout.id == id)
98 			{
99 				timeout.sw.stop();
100 				if (timeouts.length > 1)
101 					timeouts[i] = timeouts[$ - 1];
102 				timeouts.length--;
103 				return;
104 			}
105 		}
106 }