1 module served.lsp.filereader;
2 
3 import core.thread;
4 import core.sync.mutex;
5 
6 import std.algorithm;
7 import std.stdio;
8 
9 /// A file reader implementation using the Win32 API using events. Reads as much
10 /// content as possible when new data is available at once, making the file
11 /// reading operation much more efficient when large chunks of data are being
12 /// transmitted.
13 version (Windows) class WindowsStdinReader : FileReader
14 {
15 	import core.sys.windows.windows;
16 	import core.sync.event;
17 
18 	this()
19 	{
20 		super();
21 	}
22 
23 	override void stop()
24 	{
25 		wantStop = true;
26 		closeEvent.wait(5.seconds);
27 	}
28 
29 	override void run()
30 	{
31 		closeEvent.reset();
32 		scope (exit)
33 			closeEvent.set();
34 
35 		auto stdin = GetStdHandle(STD_INPUT_HANDLE);
36 		ubyte[4096] buffer;
37 
38 		while (!wantStop)
39 		{
40 			switch (WaitForSingleObject(stdin, 1000))
41 			{
42 			case WAIT_TIMEOUT:
43 				break;
44 			case WAIT_OBJECT_0:
45 				DWORD len;
46 				if (!ReadFile(stdin, &buffer, buffer.length, &len, null))
47 				{
48 					stderr.writeln("ReadFile failed ", GetLastError());
49 					break;
50 				}
51 				if (len == 0)
52 				{
53 					stderr.writeln("WindowsStdinReader EOF");
54 					return;
55 				}
56 				synchronized (mutex)
57 					data ~= buffer[0 .. len];
58 				break;
59 			case WAIT_FAILED:
60 				stderr.writeln("stdin read failed ", GetLastError());
61 				break;
62 			case WAIT_ABANDONED:
63 				stderr.writeln("stdin read wait was abandoned ", GetLastError());
64 				break;
65 			default:
66 				stderr.writeln("Unexpected WaitForSingleObject response");
67 				break;
68 			}
69 		}
70 	}
71 
72 	override bool isReading()
73 	{
74 		return isRunning;
75 	}
76 
77 	private bool wantStop;
78 	private Event closeEvent;
79 }
80 
81 version (Windows)
82 {
83 	private extern(Windows) bool CancelSynchronousIo(void* hThread);
84 }
85 
86 
87 /// ditto
88 version (Windows) class WindowsFileReader : FileReader
89 {
90 	import core.sys.windows.windows;
91 	import core.sync.event;
92 
93 	this(File file)
94 	{
95 		handle = file.windowsHandle;
96 		super();
97 	}
98 
99 	override void stop()
100 	{
101 		wantStop = true;
102 		CancelSynchronousIo(thread);
103 		closeEvent.wait(5.seconds);
104 	}
105 
106 	override void run()
107 	{
108 		closeEvent.reset();
109 		scope (exit)
110 			closeEvent.set();
111 
112 		ubyte[4096] buffer;
113 
114 		thread = GetCurrentThread();
115 		int errorCount = 0;
116 
117 		while (!wantStop)
118 		{
119 			DWORD numRead;
120 			if (!ReadFile(handle, buffer.ptr, buffer.length, &numRead, null))
121 			{
122 				auto error = GetLastError();
123 				if (error == ERROR_OPERATION_ABORTED)
124 					continue;
125 				stderr.writeln("WindowsStdinReader failed with ", error);
126 				errorCount++;
127 				if (errorCount > 10)
128 				{
129 					stderr.writeln("Closing WindowsStdinReader because too many errors");
130 					break;
131 				}
132 				continue;
133 			}
134 			synchronized (mutex)
135 				data ~= buffer[0 .. numRead];
136 		}
137 	}
138 
139 	override bool isReading()
140 	{
141 		return isRunning;
142 	}
143 
144 	private bool wantStop;
145 	private Event closeEvent;
146 	private HANDLE handle;
147 	private HANDLE thread;
148 }
149 
150 /// A file reader implementation using the POSIX select API using events. Reads
151 /// as much content as possible when new data is available at once, making the
152 /// file reading operation much more efficient when large chunks of data are
153 /// being transmitted.
154 ///
155 /// Ideally would want to implement Epoll and Kqueue implementations of this
156 /// reader instead, to support much longer timeouts with proper stop methods.
157 version (Posix) class PosixStdinReader : PosixFileReader
158 {
159 	this()
160 	{
161 		File f;
162 		f.fdopen(0); // use stdin even if std.stdio.stdin is changed
163 		super(f);
164 	}
165 }
166 
167 /// ditto
168 version (Posix) class PosixFileReader : FileReader
169 {
170 	import core.stdc.errno;
171 	import core.sys.posix.sys.select;
172 	import core.sys.posix.sys.time;
173 	import core.sys.posix.sys.types;
174 	import core.sys.posix.unistd;
175 	import core.sync.event;
176 
177 	File stdFile;
178 	Event closeEvent;
179 	bool wantStop;
180 
181 	this(File stdFile)
182 	{
183 		this.stdFile = stdFile;
184 		this.closeEvent = Event(true, true);
185 	}
186 
187 	override void stop()
188 	{
189 		wantStop = true;
190 		closeEvent.wait(5.seconds);
191 	}
192 
193 	override void run()
194 	{
195 		closeEvent.reset();
196 		scope (exit)
197 			closeEvent.set();
198 		int fd = stdFile.fileno;
199 
200 		ubyte[4096] buffer;
201 		scope (exit)
202 			stdFile.close();
203 
204 		wantStop = false;
205 		while (!wantStop)
206 		{
207 			fd_set rfds;
208 			timeval tv;
209 
210 			FD_ZERO(&rfds);
211 			FD_SET(fd, &rfds);
212 
213 			tv.tv_sec = 1;
214 
215 			auto ret = select(fd + 1, &rfds, null, null, &tv);
216 
217 			if (ret == -1)
218 			{
219 				int err = errno;
220 				if (err == EINTR)
221 					continue;
222 				stderr.writeln("[fatal] PosixStdinReader error ", err, " in select()");
223 				break;
224 			}
225 			else if (ret)
226 			{
227 				auto len = read(fd, buffer.ptr, buffer.length);
228 				if (len == -1)
229 				{
230 					int err = errno;
231 					if (err == EINTR)
232 						continue;
233 					stderr.writeln("PosixStdinReader error ", errno, " in read()");
234 					break;
235 				}
236 				else if (len == 0)
237 				{
238 					break; // eof
239 				}
240 				else
241 				{
242 					synchronized (mutex)
243 						data ~= buffer[0 .. len];
244 				}
245 			}
246 		}
247 	}
248 
249 	override bool isReading()
250 	{
251 		return isRunning && !stdin.eof && !stdin.error;
252 	}
253 }
254 
255 /// Base class for file readers which can read a file or standard handle line
256 /// by line in a Fiber context, yielding until a line is available.
257 abstract class FileReader : Thread
258 {
259 	this()
260 	{
261 		super(&run);
262 		isDaemon = true;
263 		mutex = new Mutex();
264 	}
265 
266 	string yieldLine(bool* whileThisIs = null, bool equalToThis = true)
267 	{
268 		ptrdiff_t index;
269 		string ret;
270 		while (whileThisIs is null || *whileThisIs == equalToThis)
271 		{
272 			bool hasData;
273 			synchronized (mutex)
274 			{
275 				index = data.countUntil([cast(ubyte) '\r', cast(ubyte) '\n']);
276 				if (index != -1)
277 				{
278 					ret = cast(string) data[0 .. index].dup;
279 					data = data[index + 2 .. $];
280 					break;
281 				}
282 
283 				hasData = data.length != 0;
284 			}
285 
286 			if (!hasData && !isReading)
287 				return ret.length ? ret : null;
288 
289 			Fiber.yield();
290 		}
291 		return ret;
292 	}
293 
294 	/// Yields until the specified length of data is available, then removes the
295 	/// data from the incoming data stream atomically and returns a duplicate of
296 	/// it.
297 	/// Returns null if the file reader stops while reading.
298 	ubyte[] yieldData(size_t length, bool* whileThisIs = null, bool equalToThis = true)
299 	{
300 		while (whileThisIs is null || *whileThisIs == equalToThis)
301 		{
302 			bool hasData;
303 			synchronized (mutex)
304 			{
305 				if (data.length >= length)
306 				{
307 					auto ret = data[0 .. length].dup;
308 					data = data[length .. $];
309 					return ret;
310 				}
311 
312 				hasData = data.length != 0;
313 			}
314 
315 			if (!hasData && !isReading)
316 				return null;
317 
318 			Fiber.yield();
319 		}
320 		return null;
321 	}
322 
323 	abstract void stop();
324 	abstract bool isReading();
325 
326 protected:
327 	abstract void run();
328 
329 	ubyte[] data;
330 	Mutex mutex;
331 }
332 
333 /// Creates a new FileReader using the GC reading from stdin using a platform
334 /// optimized implementation or StdFileReader if none is available.
335 ///
336 /// The created instance can then be started using the `start` method and
337 /// stopped at exit using the `stop` method.
338 ///
339 /// Examples:
340 /// ---
341 /// auto input = newStdinReader();
342 /// input.start();
343 /// scope (exit)
344 ///     input.stop();
345 /// ---
346 FileReader newStdinReader()
347 {
348 	version (Windows)
349 		return new WindowsStdinReader();
350 	else version (Posix)
351 		return new PosixStdinReader();
352 	else
353 		static assert(false, "no stdin reader for this platform implemented");
354 }
355 
356 /// ditto
357 FileReader newFileReader(File stdFile)
358 {
359 	version (Windows)
360 		return new WindowsFileReader(stdFile);
361 	else version (Posix)
362 		return new PosixFileReader(stdFile);
363 	else
364 		static assert(false, "no generic file reader for this platform implemented");
365 }
366 
367 /// Reads a file into a given buffer with a specified maximum length. If the
368 /// file is bigger than the buffer, the buffer will be resized using the GC and
369 /// updated through the ref argument.
370 /// Params:
371 ///   file = The filename of the file to read.
372 ///   buffer = A GC allocated buffer that may be enlarged if it is too small.
373 ///   maxLen = The maxmimum amount of bytes to read from the file.
374 /// Returns: The contents of the file up to maxLen or EOF. The data is a slice
375 /// of the buffer argument case to a `char[]`.
376 char[] readCodeWithBuffer(string file, scope return ref ubyte[] buffer, size_t maxLen = 1024 * 50)
377 in (buffer.length > 0)
378 {
379 	auto f = File(file, "rb");
380 	size_t len;
381 	while (len < buffer.length)
382 	{
383 		len += f.rawRead(buffer[len .. $]).length;
384 		if (f.eof)
385 			return cast(char[]) buffer[0 .. min(maxLen, len)];
386 	}
387 	while (buffer.length * 2 < maxLen)
388 	{
389 		buffer.length *= 2;
390 		while (len < buffer.length)
391 		{
392 			len += f.rawRead(buffer[len .. $]).length;
393 			if (f.eof)
394 				return cast(char[]) buffer[0 .. min(maxLen, len)];
395 		}
396 	}
397 	if (buffer.length >= maxLen)
398 		return cast(char[]) buffer[0 .. maxLen];
399 	buffer.length = maxLen;
400 	f.rawRead(buffer[len .. $]);
401 	return cast(char[]) buffer;
402 }
403 
404 unittest
405 {
406 	ubyte[2048] buffer;
407 	auto slice = buffer[];
408 	assert(slice.ptr is buffer.ptr);
409 	auto code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice);
410 	assert(slice.ptr !is buffer.ptr);
411 	assert(code[0 .. 29] == "module served.lsp.filereader;");
412 
413 	slice = new ubyte[1024 * 64]; // enough to store full file
414 	code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice);
415 	assert(code[0 .. 29] == "module served.lsp.filereader;");
416 
417 	// with max length
418 	code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice, 16);
419 	assert(code == "module served.ls");
420 
421 	// with max length and small buffer
422 	slice = new ubyte[8];
423 	code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice, 16);
424 	assert(code == "module served.ls");
425 
426 	// small buffer not aligning
427 	slice = new ubyte[7];
428 	code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice, 16);
429 	assert(code == "module served.ls");
430 }