1 module served.lsp.filereader; 2 3 import core.thread; 4 import core.sync.mutex; 5 6 import std.algorithm; 7 import std.stdio; 8 9 /// A file reader implementation using the Win32 API using events. Reads as much 10 /// content as possible when new data is available at once, making the file 11 /// reading operation much more efficient when large chunks of data are being 12 /// transmitted. 13 version (Windows) class WindowsStdinReader : FileReader 14 { 15 import core.sys.windows.windows; 16 import core.sync.event; 17 18 this() 19 { 20 super(); 21 } 22 23 override void stop() 24 { 25 wantStop = true; 26 closeEvent.wait(5.seconds); 27 } 28 29 override void run() 30 { 31 closeEvent.reset(); 32 scope (exit) 33 closeEvent.set(); 34 35 auto stdin = GetStdHandle(STD_INPUT_HANDLE); 36 ubyte[4096] buffer; 37 38 while (!wantStop) 39 { 40 switch (WaitForSingleObject(stdin, 1000)) 41 { 42 case WAIT_TIMEOUT: 43 break; 44 case WAIT_OBJECT_0: 45 DWORD len; 46 if (!ReadFile(stdin, &buffer, buffer.length, &len, null)) 47 { 48 stderr.writeln("ReadFile failed ", GetLastError()); 49 break; 50 } 51 if (len == 0) 52 { 53 stderr.writeln("WindowsStdinReader EOF"); 54 return; 55 } 56 synchronized (mutex) 57 data ~= buffer[0 .. len]; 58 break; 59 case WAIT_FAILED: 60 stderr.writeln("stdin read failed ", GetLastError()); 61 break; 62 case WAIT_ABANDONED: 63 stderr.writeln("stdin read wait was abandoned ", GetLastError()); 64 break; 65 default: 66 stderr.writeln("Unexpected WaitForSingleObject response"); 67 break; 68 } 69 } 70 } 71 72 override bool isReading() 73 { 74 return isRunning; 75 } 76 77 private bool wantStop; 78 private Event closeEvent; 79 } 80 81 version (Windows) 82 { 83 private extern(Windows) bool CancelSynchronousIo(void* hThread); 84 } 85 86 87 /// ditto 88 version (Windows) class WindowsFileReader : FileReader 89 { 90 import core.sys.windows.windows; 91 import core.sync.event; 92 93 this(File file) 94 { 95 handle = file.windowsHandle; 96 super(); 97 } 98 99 override void stop() 100 { 101 wantStop = true; 102 CancelSynchronousIo(thread); 103 closeEvent.wait(5.seconds); 104 } 105 106 override void run() 107 { 108 closeEvent.reset(); 109 scope (exit) 110 closeEvent.set(); 111 112 ubyte[4096] buffer; 113 114 thread = GetCurrentThread(); 115 int errorCount = 0; 116 117 while (!wantStop) 118 { 119 DWORD numRead; 120 if (!ReadFile(handle, buffer.ptr, buffer.length, &numRead, null)) 121 { 122 auto error = GetLastError(); 123 if (error == ERROR_OPERATION_ABORTED) 124 continue; 125 stderr.writeln("WindowsStdinReader failed with ", error); 126 errorCount++; 127 if (errorCount > 10) 128 { 129 stderr.writeln("Closing WindowsStdinReader because too many errors"); 130 break; 131 } 132 continue; 133 } 134 synchronized (mutex) 135 data ~= buffer[0 .. numRead]; 136 } 137 } 138 139 override bool isReading() 140 { 141 return isRunning; 142 } 143 144 private bool wantStop; 145 private Event closeEvent; 146 private HANDLE handle; 147 private HANDLE thread; 148 } 149 150 /// A file reader implementation using the POSIX select API using events. Reads 151 /// as much content as possible when new data is available at once, making the 152 /// file reading operation much more efficient when large chunks of data are 153 /// being transmitted. 154 /// 155 /// Ideally would want to implement Epoll and Kqueue implementations of this 156 /// reader instead, to support much longer timeouts with proper stop methods. 157 version (Posix) class PosixStdinReader : PosixFileReader 158 { 159 this() 160 { 161 File f; 162 f.fdopen(0); // use stdin even if std.stdio.stdin is changed 163 super(f); 164 } 165 } 166 167 /// ditto 168 version (Posix) class PosixFileReader : FileReader 169 { 170 import core.stdc.errno; 171 import core.sys.posix.sys.select; 172 import core.sys.posix.sys.time; 173 import core.sys.posix.sys.types; 174 import core.sys.posix.unistd; 175 import core.sync.event; 176 177 File stdFile; 178 Event closeEvent; 179 bool wantStop; 180 181 this(File stdFile) 182 { 183 this.stdFile = stdFile; 184 this.closeEvent = Event(true, true); 185 } 186 187 override void stop() 188 { 189 wantStop = true; 190 closeEvent.wait(5.seconds); 191 } 192 193 override void run() 194 { 195 closeEvent.reset(); 196 scope (exit) 197 closeEvent.set(); 198 int fd = stdFile.fileno; 199 200 ubyte[4096] buffer; 201 scope (exit) 202 stdFile.close(); 203 204 wantStop = false; 205 while (!wantStop) 206 { 207 fd_set rfds; 208 timeval tv; 209 210 FD_ZERO(&rfds); 211 FD_SET(fd, &rfds); 212 213 tv.tv_sec = 1; 214 215 auto ret = select(fd + 1, &rfds, null, null, &tv); 216 217 if (ret == -1) 218 { 219 int err = errno; 220 if (err == EINTR) 221 continue; 222 stderr.writeln("[fatal] PosixStdinReader error ", err, " in select()"); 223 break; 224 } 225 else if (ret) 226 { 227 auto len = read(fd, buffer.ptr, buffer.length); 228 if (len == -1) 229 { 230 int err = errno; 231 if (err == EINTR) 232 continue; 233 stderr.writeln("PosixStdinReader error ", errno, " in read()"); 234 break; 235 } 236 else if (len == 0) 237 { 238 break; // eof 239 } 240 else 241 { 242 synchronized (mutex) 243 data ~= buffer[0 .. len]; 244 } 245 } 246 } 247 } 248 249 override bool isReading() 250 { 251 return isRunning && !stdin.eof && !stdin.error; 252 } 253 } 254 255 /// Base class for file readers which can read a file or standard handle line 256 /// by line in a Fiber context, yielding until a line is available. 257 abstract class FileReader : Thread 258 { 259 this() 260 { 261 super(&run); 262 isDaemon = true; 263 mutex = new Mutex(); 264 } 265 266 string yieldLine(bool* whileThisIs = null, bool equalToThis = true) 267 { 268 ptrdiff_t index; 269 string ret; 270 while (whileThisIs is null || *whileThisIs == equalToThis) 271 { 272 bool hasData; 273 synchronized (mutex) 274 { 275 index = data.countUntil([cast(ubyte) '\r', cast(ubyte) '\n']); 276 if (index != -1) 277 { 278 ret = cast(string) data[0 .. index].dup; 279 data = data[index + 2 .. $]; 280 break; 281 } 282 283 hasData = data.length != 0; 284 } 285 286 if (!hasData && !isReading) 287 return ret.length ? ret : null; 288 289 Fiber.yield(); 290 } 291 return ret; 292 } 293 294 /// Yields until the specified length of data is available, then removes the 295 /// data from the incoming data stream atomically and returns a duplicate of 296 /// it. 297 /// Returns null if the file reader stops while reading. 298 ubyte[] yieldData(size_t length, bool* whileThisIs = null, bool equalToThis = true) 299 { 300 while (whileThisIs is null || *whileThisIs == equalToThis) 301 { 302 bool hasData; 303 synchronized (mutex) 304 { 305 if (data.length >= length) 306 { 307 auto ret = data[0 .. length].dup; 308 data = data[length .. $]; 309 return ret; 310 } 311 312 hasData = data.length != 0; 313 } 314 315 if (!hasData && !isReading) 316 return null; 317 318 Fiber.yield(); 319 } 320 return null; 321 } 322 323 abstract void stop(); 324 abstract bool isReading(); 325 326 protected: 327 abstract void run(); 328 329 ubyte[] data; 330 Mutex mutex; 331 } 332 333 /// Creates a new FileReader using the GC reading from stdin using a platform 334 /// optimized implementation or StdFileReader if none is available. 335 /// 336 /// The created instance can then be started using the `start` method and 337 /// stopped at exit using the `stop` method. 338 /// 339 /// Examples: 340 /// --- 341 /// auto input = newStdinReader(); 342 /// input.start(); 343 /// scope (exit) 344 /// input.stop(); 345 /// --- 346 FileReader newStdinReader() 347 { 348 version (Windows) 349 return new WindowsStdinReader(); 350 else version (Posix) 351 return new PosixStdinReader(); 352 else 353 static assert(false, "no stdin reader for this platform implemented"); 354 } 355 356 /// ditto 357 FileReader newFileReader(File stdFile) 358 { 359 version (Windows) 360 return new WindowsFileReader(stdFile); 361 else version (Posix) 362 return new PosixFileReader(stdFile); 363 else 364 static assert(false, "no generic file reader for this platform implemented"); 365 } 366 367 /// Reads a file into a given buffer with a specified maximum length. If the 368 /// file is bigger than the buffer, the buffer will be resized using the GC and 369 /// updated through the ref argument. 370 /// Params: 371 /// file = The filename of the file to read. 372 /// buffer = A GC allocated buffer that may be enlarged if it is too small. 373 /// maxLen = The maxmimum amount of bytes to read from the file. 374 /// Returns: The contents of the file up to maxLen or EOF. The data is a slice 375 /// of the buffer argument case to a `char[]`. 376 char[] readCodeWithBuffer(string file, scope return ref ubyte[] buffer, size_t maxLen = 1024 * 50) 377 in (buffer.length > 0) 378 { 379 auto f = File(file, "rb"); 380 size_t len; 381 while (len < buffer.length) 382 { 383 len += f.rawRead(buffer[len .. $]).length; 384 if (f.eof) 385 return cast(char[]) buffer[0 .. min(maxLen, len)]; 386 } 387 while (buffer.length * 2 < maxLen) 388 { 389 buffer.length *= 2; 390 while (len < buffer.length) 391 { 392 len += f.rawRead(buffer[len .. $]).length; 393 if (f.eof) 394 return cast(char[]) buffer[0 .. min(maxLen, len)]; 395 } 396 } 397 if (buffer.length >= maxLen) 398 return cast(char[]) buffer[0 .. maxLen]; 399 buffer.length = maxLen; 400 f.rawRead(buffer[len .. $]); 401 return cast(char[]) buffer; 402 } 403 404 unittest 405 { 406 ubyte[2048] buffer; 407 auto slice = buffer[]; 408 assert(slice.ptr is buffer.ptr); 409 auto code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice); 410 assert(slice.ptr !is buffer.ptr); 411 assert(code[0 .. 29] == "module served.lsp.filereader;"); 412 413 slice = new ubyte[1024 * 64]; // enough to store full file 414 code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice); 415 assert(code[0 .. 29] == "module served.lsp.filereader;"); 416 417 // with max length 418 code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice, 16); 419 assert(code == "module served.ls"); 420 421 // with max length and small buffer 422 slice = new ubyte[8]; 423 code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice, 16); 424 assert(code == "module served.ls"); 425 426 // small buffer not aligning 427 slice = new ubyte[7]; 428 code = readCodeWithBuffer("lsp/source/served/lsp/filereader.d", slice, 16); 429 assert(code == "module served.ls"); 430 }