OILS / vendor / souffle / io / ReadStreamCSV.h View on Github | oils.pub

427 lines, 259 significant
1/*
2 * Souffle - A Datalog Compiler
3 * Copyright (c) 2021, The Souffle Developers. All rights reserved
4 * Licensed under the Universal Permissive License v 1.0 as shown at:
5 * - https://opensource.org/licenses/UPL
6 * - <souffle root>/licenses/SOUFFLE-UPL.txt
7 */
8
9/************************************************************************
10 *
11 * @file ReadStreamCSV.h
12 *
13 ***********************************************************************/
14
15#pragma once
16
17#include "souffle/RamTypes.h"
18#include "souffle/RecordTable.h"
19#include "souffle/SymbolTable.h"
20#include "souffle/io/ReadStream.h"
21#include "souffle/utility/ContainerUtil.h"
22#include "souffle/utility/FileUtil.h"
23#include "souffle/utility/StringUtil.h"
24
25#ifdef USE_LIBZ
26#include "souffle/io/gzfstream.h"
27#else
28#include <fstream>
29#endif
30
31#include <algorithm>
32#include <cassert>
33#include <cstddef>
34#include <cstdint>
35#include <iostream>
36#include <map>
37#include <memory>
38#include <sstream>
39#include <stdexcept>
40#include <string>
41#include <vector>
42
43namespace souffle {
44
45class ReadStreamCSV : public ReadStream {
46public:
47 ReadStreamCSV(std::istream& file, const std::map<std::string, std::string>& rwOperation,
48 SymbolTable& symbolTable, RecordTable& recordTable)
49 : ReadStream(rwOperation, symbolTable, recordTable),
50 rfc4180(getOr(rwOperation, "rfc4180", "false") == std::string("true")),
51 delimiter(getOr(rwOperation, "delimiter", (rfc4180 ? "," : "\t"))), file(file), lineNumber(0),
52 inputMap(getInputColumnMap(rwOperation, static_cast<unsigned int>(arity))) {
53 if (rfc4180 && delimiter.find('"') != std::string::npos) {
54 std::stringstream errorMessage;
55 errorMessage << "CSV delimiter cannot contain '\"' character when rfc4180 is enabled.";
56 throw std::invalid_argument(errorMessage.str());
57 }
58
59 while (inputMap.size() < arity) {
60 int size = static_cast<int>(inputMap.size());
61 inputMap[size] = size;
62 }
63 }
64
65protected:
66 bool readNextLine(std::string& line, bool& isCRLF) {
67 if (!getline(file, line)) {
68 return false;
69 }
70 // Handle Windows line endings on non-Windows systems
71 isCRLF = !line.empty() && line.back() == '\r';
72 if (isCRLF) {
73 line.pop_back();
74 }
75 ++lineNumber;
76 return true;
77 }
78
79 /**
80 * Read and return the next tuple.
81 *
82 * Returns nullptr if no tuple was readable.
83 * @return
84 */
85 Own<RamDomain[]> readNextTuple() override {
86 if (file.eof()) {
87 return nullptr;
88 }
89 std::string line;
90 Own<RamDomain[]> tuple = mk<RamDomain[]>(typeAttributes.size());
91 bool wasCRLF = false;
92 if (!readNextLine(line, wasCRLF)) {
93 return nullptr;
94 }
95
96 std::size_t start = 0;
97 std::size_t columnsFilled = 0;
98 for (uint32_t column = 0; columnsFilled < arity; column++) {
99 std::size_t charactersRead = 0;
100 std::string element = nextElement(line, start, wasCRLF);
101 if (inputMap.count(column) == 0) {
102 continue;
103 }
104 ++columnsFilled;
105
106 try {
107 auto&& ty = typeAttributes.at(inputMap[column]);
108 switch (ty[0]) {
109 case 's': {
110 tuple[inputMap[column]] = symbolTable.encode(element);
111 charactersRead = element.size();
112 break;
113 }
114 case 'r': {
115 tuple[inputMap[column]] = readRecord(element, ty, 0, &charactersRead);
116 break;
117 }
118 case '+': {
119 tuple[inputMap[column]] = readADT(element, ty, 0, &charactersRead);
120 break;
121 }
122 case 'i': {
123 tuple[inputMap[column]] = RamSignedFromString(element, &charactersRead);
124 break;
125 }
126 case 'u': {
127 tuple[inputMap[column]] = ramBitCast(readRamUnsigned(element, charactersRead));
128 break;
129 }
130 case 'f': {
131 tuple[inputMap[column]] = ramBitCast(RamFloatFromString(element, &charactersRead));
132 break;
133 }
134 default: fatal("invalid type attribute: `%c`", ty[0]);
135 }
136 // Check if everything was read.
137 if (charactersRead != element.size()) {
138 throw std::invalid_argument(
139 "Expected: " + delimiter + " or \\n. Got: " + element[charactersRead]);
140 }
141 } catch (...) {
142 std::stringstream errorMessage;
143 errorMessage << "Error converting <" + element + "> in column " << column + 1 << " in line "
144 << lineNumber << "; ";
145 throw std::invalid_argument(errorMessage.str());
146 }
147 }
148
149 return tuple;
150 }
151
152 /**
153 * Read an unsigned element. Possible bases are 2, 10, 16
154 * Base is indicated by the first two chars.
155 */
156 RamUnsigned readRamUnsigned(const std::string& element, std::size_t& charactersRead) {
157 // Sanity check
158 assert(element.size() > 0);
159
160 RamSigned value = 0;
161
162 // Check prefix and parse the input.
163 if (isPrefix("0b", element)) {
164 value = RamUnsignedFromString(element, &charactersRead, 2);
165 } else if (isPrefix("0x", element)) {
166 value = RamUnsignedFromString(element, &charactersRead, 16);
167 } else {
168 value = RamUnsignedFromString(element, &charactersRead);
169 }
170 return value;
171 }
172
173 std::string nextElement(std::string& line, std::size_t& start, bool& wasCRLF) {
174 std::string element;
175
176 if (rfc4180) {
177 if (line[start] == '"') {
178 // quoted field
179 std::size_t end = line.length();
180 std::size_t pos = start + 1;
181 bool foundEndQuote = false;
182 while (!foundEndQuote) {
183 if (pos == end) {
184 bool newWasCRLF = false;
185 if (!readNextLine(line, newWasCRLF)) {
186 break;
187 }
188 // account for \r\n or \n that we had previously
189 // read and thrown out.
190 // since we're in a quote, we should restore
191 // what the user provided
192 if (wasCRLF) {
193 element.push_back('\r');
194 }
195 element.push_back('\n');
196
197 // remember if we just read a CRLF sequence
198 wasCRLF = newWasCRLF;
199
200 // start over
201 pos = 0;
202 end = line.length();
203 }
204 if (pos == end) {
205 // this means we've got a blank line and we need to read
206 // more
207 continue;
208 }
209
210 char c = line[pos++];
211 if (c == '"' && (pos < end) && line[pos] == '"') {
212 // two double-quote => one double-quote
213 element.push_back('"');
214 ++pos;
215 } else if (c == '"') {
216 foundEndQuote = true;
217 } else {
218 element.push_back(c);
219 }
220 }
221
222 if (!foundEndQuote) {
223 // missing closing quote
224 std::stringstream errorMessage;
225 errorMessage << "Unbalanced field quote in line " << lineNumber << "; ";
226 throw std::invalid_argument(errorMessage.str());
227 }
228
229 // field must be immediately followed by delimiter or end of line
230 if (pos != line.length()) {
231 std::size_t nextDelimiter = line.find(delimiter, pos);
232 if (nextDelimiter != pos) {
233 std::stringstream errorMessage;
234 errorMessage << "Separator expected immediately after quoted field in line "
235 << lineNumber << "; ";
236 throw std::invalid_argument(errorMessage.str());
237 }
238 }
239
240 start = pos + delimiter.size();
241 return element;
242 } else {
243 // non-quoted field, span until next delimiter or end of line
244 const std::size_t end = std::min(line.find(delimiter, start), line.length());
245 element = line.substr(start, end - start);
246 start = end + delimiter.size();
247
248 return element;
249 }
250 }
251
252 std::size_t end = start;
253 // Handle record/tuple delimiter coincidence.
254 if (delimiter.find(',') != std::string::npos) {
255 int record_parens = 0;
256 std::size_t next_delimiter = line.find(delimiter, start);
257
258 // Find first delimiter after the record.
259 while (end < std::min(next_delimiter, line.length()) || record_parens != 0) {
260 // Track the number of parenthesis.
261 if (line[end] == '[') {
262 ++record_parens;
263 } else if (line[end] == ']') {
264 --record_parens;
265 }
266
267 // Check for unbalanced parenthesis.
268 if (record_parens < 0) {
269 break;
270 };
271
272 ++end;
273
274 // Find a next delimiter if the old one is invalid.
275 // But only if inside the unbalance parenthesis.
276 if (end == next_delimiter && record_parens != 0) {
277 next_delimiter = line.find(delimiter, end);
278 }
279 }
280
281 // Handle the end-of-the-line case where parenthesis are unbalanced.
282 if (record_parens != 0) {
283 std::stringstream errorMessage;
284 errorMessage << "Unbalanced record parenthesis in line " << lineNumber << "; ";
285 throw std::invalid_argument(errorMessage.str());
286 }
287 } else {
288 end = std::min(line.find(delimiter, start), line.length());
289 }
290
291 // Check for missing value.
292 if (start > end) {
293 std::stringstream errorMessage;
294 errorMessage << "Values missing in line " << lineNumber << "; ";
295 throw std::invalid_argument(errorMessage.str());
296 }
297
298 element = line.substr(start, end - start);
299 start = end + delimiter.size();
300
301 return element;
302 }
303
304 std::map<int, int> getInputColumnMap(
305 const std::map<std::string, std::string>& rwOperation, const unsigned arity_) const {
306 std::string columnString = getOr(rwOperation, "columns", "");
307 std::map<int, int> inputColumnMap;
308
309 if (!columnString.empty()) {
310 std::istringstream iss(columnString);
311 std::string mapping;
312 int index = 0;
313 while (std::getline(iss, mapping, ':')) {
314 inputColumnMap[stoi(mapping)] = index++;
315 }
316 if (inputColumnMap.size() < arity_) {
317 throw std::invalid_argument("Invalid column set was given: <" + columnString + ">");
318 }
319 } else {
320 while (inputColumnMap.size() < arity_) {
321 int size = static_cast<int>(inputColumnMap.size());
322 inputColumnMap[size] = size;
323 }
324 }
325 return inputColumnMap;
326 }
327
328 const bool rfc4180;
329 const std::string delimiter;
330 std::istream& file;
331 std::size_t lineNumber;
332 std::map<int, int> inputMap;
333};
334
335class ReadFileCSV : public ReadStreamCSV {
336public:
337 ReadFileCSV(const std::map<std::string, std::string>& rwOperation, SymbolTable& symbolTable,
338 RecordTable& recordTable)
339 : ReadStreamCSV(fileHandle, rwOperation, symbolTable, recordTable),
340 baseName(souffle::baseName(getFileName(rwOperation))),
341 fileHandle(getFileName(rwOperation), std::ios::in | std::ios::binary) {
342 if (!fileHandle.is_open()) {
343 // suppress error message in case file cannot be open when flag -w is set
344 if (getOr(rwOperation, "no-warn", "false") != "true") {
345 throw std::invalid_argument("Cannot open fact file " + baseName + "\n");
346 }
347 }
348 // Strip headers if we're using them
349 if (getOr(rwOperation, "headers", "false") == "true") {
350 std::string line;
351 getline(file, line);
352 }
353 }
354
355 /**
356 * Read and return the next tuple.
357 *
358 * Returns nullptr if no tuple was readable.
359 * @return
360 */
361 Own<RamDomain[]> readNextTuple() override {
362 try {
363 return ReadStreamCSV::readNextTuple();
364 } catch (std::exception& e) {
365 std::stringstream errorMessage;
366 errorMessage << e.what();
367 errorMessage << "cannot parse fact file " << baseName << "!\n";
368 throw std::invalid_argument(errorMessage.str());
369 }
370 }
371
372 ~ReadFileCSV() override = default;
373
374protected:
375 /**
376 * Return given filename or construct from relation name.
377 * Default name is [configured path]/[relation name].facts
378 *
379 * @param rwOperation map of IO configuration options
380 * @return input filename
381 */
382 static std::string getFileName(const std::map<std::string, std::string>& rwOperation) {
383 auto name = getOr(rwOperation, "filename", rwOperation.at("name") + ".facts");
384 if (!isAbsolute(name)) {
385 name = getOr(rwOperation, "fact-dir", ".") + pathSeparator + name;
386 }
387 return name;
388 }
389
390 std::string baseName;
391#ifdef USE_LIBZ
392 gzfstream::igzfstream fileHandle;
393#else
394 std::ifstream fileHandle;
395#endif
396};
397
398class ReadCinCSVFactory : public ReadStreamFactory {
399public:
400 Own<ReadStream> getReader(const std::map<std::string, std::string>& rwOperation, SymbolTable& symbolTable,
401 RecordTable& recordTable) override {
402 return mk<ReadStreamCSV>(std::cin, rwOperation, symbolTable, recordTable);
403 }
404
405 const std::string& getName() const override {
406 static const std::string name = "stdin";
407 return name;
408 }
409 ~ReadCinCSVFactory() override = default;
410};
411
412class ReadFileCSVFactory : public ReadStreamFactory {
413public:
414 Own<ReadStream> getReader(const std::map<std::string, std::string>& rwOperation, SymbolTable& symbolTable,
415 RecordTable& recordTable) override {
416 return mk<ReadFileCSV>(rwOperation, symbolTable, recordTable);
417 }
418
419 const std::string& getName() const override {
420 static const std::string name = "file";
421 return name;
422 }
423
424 ~ReadFileCSVFactory() override = default;
425};
426
427} /* namespace souffle */