1 module served.utils.async;
2 
3 import core.sync.mutex : Mutex;
4 import core.time : Duration;
5 
6 import served.utils.fibermanager;
7 
8 import std.datetime.stopwatch : msecs, StopWatch;
9 import std.experimental.logger;
10 
11 __gshared void delegate(void delegate(), int pages, string file, int line) spawnFiberImpl;
12 __gshared int defaultFiberPages = 20;
13 __gshared int timeoutID;
14 __gshared Timeout[] timeouts;
15 __gshared Mutex timeoutsMutex;
16 
17 void spawnFiber(void delegate() cb, string file = __FILE__, int line = __LINE__)
18 {
19 	spawnFiber(cb, defaultFiberPages, file, line);
20 }
21 
22 void spawnFiber(void delegate() cb, int pages, string file = __FILE__, int line = __LINE__)
23 {
24 	if (spawnFiberImpl)
25 		spawnFiberImpl(cb, pages, file, line);
26 	else
27 		setImmediate(cb);
28 }
29 
30 // Called at most 100x per second
31 void parallelMain()
32 {
33 	timeoutsMutex = new Mutex;
34 	void delegate()[32] callsBuf;
35 	void delegate()[] calls;
36 	while (true)
37 	{
38 		synchronized (timeoutsMutex)
39 			foreach_reverse (i, ref timeout; timeouts)
40 			{
41 				if (timeout.sw.peek >= timeout.timeout)
42 				{
43 					timeout.sw.stop();
44 					trace("Calling timeout");
45 					callsBuf[calls.length] = timeout.callback;
46 					calls = callsBuf[0 .. calls.length + 1];
47 					if (timeouts.length > 1)
48 						timeouts[i] = timeouts[$ - 1];
49 					timeouts.length--;
50 
51 					if (calls.length >= callsBuf.length)
52 						break;
53 				}
54 			}
55 
56 		foreach (call; calls)
57 			call();
58 
59 		callsBuf[] = null;
60 		calls = null;
61 		Fiber.yield();
62 	}
63 }
64 
65 struct Timeout
66 {
67 	StopWatch sw;
68 	Duration timeout;
69 	void delegate() callback;
70 	int id;
71 }
72 
73 int setTimeout(void delegate() callback, int ms)
74 {
75 	return setTimeout(callback, ms.msecs);
76 }
77 
78 void setImmediate(void delegate() callback)
79 {
80 	setTimeout(callback, 0);
81 }
82 
83 int setTimeout(void delegate() callback, Duration timeout)
84 {
85 	trace("Setting timeout for ", timeout);
86 	Timeout to;
87 	to.timeout = timeout;
88 	to.callback = callback;
89 	to.sw.start();
90 	synchronized (timeoutsMutex)
91 	{
92 		to.id = ++timeoutID;
93 		timeouts ~= to;
94 	}
95 	return to.id;
96 }
97 
98 void clearTimeout(int id)
99 {
100 	synchronized (timeoutsMutex)
101 		foreach_reverse (i, ref timeout; timeouts)
102 		{
103 			if (timeout.id == id)
104 			{
105 				timeout.sw.stop();
106 				if (timeouts.length > 1)
107 					timeouts[i] = timeouts[$ - 1];
108 				timeouts.length--;
109 				return;
110 			}
111 		}
112 }