Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet support for structured output #15311

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
25 changes: 24 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ endif ()

# C++14 is needed by Google Test >= 1.13, for all the other parts C++11 should be enough.
# This will silently fall back to C++11 if 14 is not supported by the compiler.
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD 17)

# compiler specific flags
if (GNU_COMPILER OR CLANG_COMPILER)
Expand Down Expand Up @@ -532,6 +532,8 @@ if (EIGEN3_FOUND)
set(ENABLED_FEATURES "${ENABLED_FEATURES} Eigen")
endif (EIGEN3_FOUND)

option(WITH_PARQUET "Compile with Parquet support" ON)

if (CHECK_OPTIONAL_LIBS)
file(GLOB GDAL_PATH "${SUMO_LIBRARIES}/gdal-?.?.?")
file(GLOB FFMPEG_PATH "${SUMO_LIBRARIES}/FFMPEG-?.?.?")
Expand All @@ -541,6 +543,27 @@ if (CHECK_OPTIONAL_LIBS)
set(CMAKE_PREFIX_PATH "${CMAKE_PREFIX_PATH};${GDAL_PATH};${FFMPEG_PATH};${OSG_PATH};${GL2PS_PATH};${GEOS_PATH}")
file(GLOB SUMO_OPTIONAL_LIBRARIES_DLL "${GDAL_PATH}/bin/*.dll" "${FFMPEG_PATH}/bin/*.dll" "${OSG_PATH}/bin/*.dll" "${GL2PS_PATH}/bin/*.dll" "${JUPEDSIM_CUSTOMDIR}/bin/*.dll")

if(WITH_PARQUET)
find_package(Arrow)
find_package(Parquet)
if (Arrow_FOUND AND Parquet_FOUND)
set(HAVE_PARQUET 1)
set(ENABLED_FEATURES "${ENABLED_FEATURES} PARQUET")
set(PARQUET_LIBRARY Parquet::parquet_shared)
# if (GTEST_FOUND)
# add_definitions("GTest_SOURCE=Bundled")
# endif()
# this is for gtest compatibility
else()
message(WARNING "Parquet support requested but Arrow or Parquet libraries not found. Disabling Parquet support.")
set(PARQUET_LIBRARY "")
set(HAVE_PARQUET 0)
endif()
else()
set(PARQUET_LIBRARY "")
set(HAVE_PARQUET 0)
endif()

# GDAL (for geopositioning)
find_package(GDAL)
if (GDAL_FOUND)
Expand Down
8 changes: 7 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
set(netconvertlibs
netwrite netimport netbuild foreign_eulerspiral ${GDAL_LIBRARY} netimport_vissim netimport_vissim_typeloader netimport_vissim_tempstructs ${commonlibs} ${TCMALLOC_LIBRARY})

if(WITH_PARQUET)
FIND_LIBRARY(PARQUET_LIBRARY NAMES parquet)
endif()

set(sumolibs
traciserver netload microsim_cfmodels microsim_engine microsim_lcmodels microsim_devices microsim_trigger microsim_output microsim_transportables microsim_actions
microsim_traffic_lights microsim mesosim ${commonvehiclelibs} ${GEOS_LIBRARY})
microsim_traffic_lights microsim mesosim ${commonvehiclelibs} ${GEOS_LIBRARY} ${PARQUET_LIBRARY})

if (OPENSCENEGRAPH_FOUND)
set(osgviewlib osgview)
endif ()
Expand Down Expand Up @@ -57,6 +62,7 @@ add_executable(sumo sumo_main.cpp)
set_target_properties(sumo PROPERTIES OUTPUT_NAME sumo${BINARY_SUFFIX})
set_target_properties(sumo PROPERTIES OUTPUT_NAME_DEBUG sumo${BINARY_SUFFIX}D)
target_link_libraries(sumo microsim traciserver libsumostatic ${sumolibs} ${TCMALLOC_LIBRARY})

add_dependencies(sumo generate-version-h install_dll)

if (FOX_FOUND)
Expand Down
3 changes: 3 additions & 0 deletions src/config.h.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@
/* defined if GDAL is available */
#cmakedefine HAVE_GDAL

/* defined if PARQUET is available */
# cmakedefine HAVE_PARQUET

/* defined if GL2PS is available */
#cmakedefine HAVE_GL2PS

Expand Down
4 changes: 2 additions & 2 deletions src/microsim/output/MSVTKExport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ MSVTKExport::write(OutputDevice& of, SUMOTime /* timestep */) {
of << "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n";
of << "<VTKFile type=\"PolyData\" version=\"0.1\" order=\"LittleEndian\">\n";
of << "<PolyData>\n";
of << " <Piece NumberOfPoints=\"" << speed.size() << "\" NumberOfVerts=\"1\" NumberOfLines=\"0\" NumberOfStrips=\"0\" NumberOfPolys=\"0\">\n";
of << " <Piece NumberOfPoints=\"" << toString(speed.size()) << "\" NumberOfVerts=\"1\" NumberOfLines=\"0\" NumberOfStrips=\"0\" NumberOfPolys=\"0\">\n";
of << "<PointData>\n";
of << " <DataArray type=\"Float64\" Name=\"speed\" format=\"ascii\">" << List2String(getSpeed()) << "</DataArray>\n";
of << "</PointData>\n";
Expand All @@ -56,7 +56,7 @@ MSVTKExport::write(OutputDevice& of, SUMOTime /* timestep */) {
of << "</Points>\n";
of << "<Verts>\n";
of << " <DataArray type=\"Int64\" Name=\"connectivity\" format=\"ascii\">" << getOffset((int) speed.size()) << "</DataArray>\n";
of << " <DataArray type=\"Int64\" Name=\"offsets\" format=\"ascii\">" << speed.size() << "</DataArray>\n";
of << " <DataArray type=\"Int64\" Name=\"offsets\" format=\"ascii\">" << toString(speed.size()) << "</DataArray>\n";
of << "</Verts>\n";
of << "<Lines>\n";
of << " <DataArray type=\"Int64\" Name=\"connectivity\" format=\"ascii\"/>\n";
Expand Down
9 changes: 9 additions & 0 deletions src/utils/common/FileHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ FileHelpers::addExtension(const std::string& path, const std::string& extension)
}
}

std::string
FileHelpers::getExtension(const std::string& path) {
const auto beg = path.find_last_of(".");
if (beg == std::string::npos) {
return "";
}
return path.substr(beg, path.size());
}


std::string
FileHelpers::getConfigurationRelative(const std::string& configPath, const std::string& path) {
Expand Down
7 changes: 7 additions & 0 deletions src/utils/common/FileHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class FileHelpers {
*/
static std::string addExtension(const std::string& path, const std::string& extension);

/** @brief Get the file extension from the given file path
*
* @param[in] path The path to the file
* @return the file extension (with dot, example: '.xml')
*/
static std::string getExtension(const std::string& path);

/** @brief Returns the second path as a relative path to the first file
*
* Given the position of the configuration file, and the information where a second
Expand Down
30 changes: 7 additions & 23 deletions src/utils/common/MsgRetrievingFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,23 @@ class MsgRetrievingFunction : public OutputDevice {
MsgRetrievingFunction(T* object, Operation operation, MsgHandler::MsgType type) :
myObject(object),
myOperation(operation),
myMsgType(type) {}
myMsgType(type) {
/// @todo We should design a new formatter type for this.
myFormatter = new PlainXMLFormatter();
myStreamDevice = new OStreamDevice(new std::ostringstream());
}


/// @brief Destructor
~MsgRetrievingFunction() {}


protected:
/// @name Methods that override/implement OutputDevice-methods
/// @{

/** @brief Returns the associated ostream
*
* The stream is an ostringstream, actually, into which the message
* is written. It is sent when postWriteHook is called.
*
* @return The used stream
* @see postWriteHook
*/
std::ostream& getOStream() {
return myMessage;
}


/** @brief Sends the data which was written to the string stream via the retrieving function.
*/
virtual void postWriteHook() {
(myObject->*myOperation)(myMsgType, myMessage.str());
myMessage.str("");
(myObject->*myOperation)(myMsgType, getOStream().str());
getOStream().str("");
}
/// @}

Expand All @@ -92,8 +80,4 @@ class MsgRetrievingFunction : public OutputDevice {

/// @brief The type of message to retrieve.
MsgHandler::MsgType myMsgType;

/// @brief message buffer
std::ostringstream myMessage;

};
17 changes: 7 additions & 10 deletions src/utils/gui/div/GUIMessageWindow.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,29 +124,26 @@ class GUIMessageWindow : public FXText {
/// @brief constructor
MsgOutputDevice(GUIMessageWindow* msgWindow, GUIEventType type) :
myMsgWindow(msgWindow),
myType(type) { }
myType(type){
/// @todo We should design a new formatter type for this.
myFormatter = new PlainXMLFormatter();
myStreamDevice = new OStreamDevice(new std::ostringstream());
}

/// @brief destructor
~MsgOutputDevice() { }

protected:
/// @brief get Output Stream
std::ostream& getOStream() {
return myStream;
}
/// @brief write hook
void postWriteHook() {
myMsgWindow->appendMsg(myType, myStream.str());
myStream.str("");
myMsgWindow->appendMsg(myType, getOStream().str());
getOStream().str("");
}

private:
/// @brief pointer to message Windows
GUIMessageWindow* myMsgWindow;

/// @brief output string stream
std::ostringstream myStream;

/// @brief type of event
GUIEventType myType;
};
Expand Down
8 changes: 6 additions & 2 deletions src/utils/iodevices/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ set(utils_iodevices_STAT_SRCS
OutputDevice_File.h
OutputDevice_String.cpp
OutputDevice_String.h
OutputDevice_Network.cpp
OutputDevice_Network.h
# OutputDevice_Network.cpp
# OutputDevice_Network.h
OutputDevice_Parquet.cpp
OutputDevice_Parquet.h
OutputFormatter.h
PlainXMLFormatter.cpp
PlainXMLFormatter.h
StreamDevices.h

)

add_library(utils_iodevices STATIC ${utils_iodevices_STAT_SRCS})
Expand Down
49 changes: 31 additions & 18 deletions src/utils/iodevices/OutputDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "OutputDevice_COUT.h"
#include "OutputDevice_CERR.h"
#include "OutputDevice_Network.h"
#include "OutputDevice_Parquet.h"
#include "PlainXMLFormatter.h"
#include <utils/common/StringUtils.h>
#include <utils/common/UtilExceptions.h>
Expand Down Expand Up @@ -77,17 +78,18 @@ OutputDevice::getDevice(const std::string& name, bool usePrefix) {
} else if (name == "stderr") {
dev = OutputDevice_CERR::getDevice();
} else if (FileHelpers::isSocket(name)) {
try {
const bool ipv6 = name[0] == '['; // IPv6 adresses may be written like '[::1]:8000'
const size_t sepIndex = name.find(":", ipv6 ? name.find("]") : 0);
const int port = StringUtils::toInt(name.substr(sepIndex + 1));
dev = new OutputDevice_Network(ipv6 ? name.substr(1, sepIndex - 2) : name.substr(0, sepIndex), port);
} catch (NumberFormatException&) {
throw IOError("Given port number '" + name.substr(name.find(":") + 1) + "' is not numeric.");
} catch (EmptyData&) {
throw IOError(TL("No port number given."));
}
} else {
// try {
// const bool ipv6 = name[0] == '['; // IPv6 adresses may be written like '[::1]:8000'
// const size_t sepIndex = name.find(":", ipv6 ? name.find("]") : 0);
// const int port = StringUtils::toInt(name.substr(sepIndex + 1));
// dev = new OutputDevice_Network(ipv6 ? name.substr(1, sepIndex - 2) : name.substr(0, sepIndex), port);
// } catch (NumberFormatException&) {
// throw IOError("Given port number '" + name.substr(name.find(":") + 1) + "' is not numeric.");
// } catch (EmptyData&) {
throw IOError(TL("No port number given."));
// }
}
else {
std::string name2 = (name == "nul" || name == "NUL") ? "/dev/null" : name;
if (usePrefix && OptionsCont::getOptions().isSet("output-prefix") && name2 != "/dev/null") {
std::string prefix = OptionsCont::getOptions().getString("output-prefix");
Expand All @@ -102,11 +104,23 @@ OutputDevice::getDevice(const std::string& name, bool usePrefix) {
name2 = FileHelpers::prependToLastPathComponent(prefix, name);
}
name2 = StringUtils::substituteEnvironment(name2, &OptionsIO::getLoadTime());
// check the file extension
const auto file_ext = FileHelpers::getExtension(name);
const int len = (int)name.length();
dev = new OutputDevice_File(name2, len > 3 && name.substr(len - 3) == ".gz");
if (file_ext == ".parquet" || file_ext == ".prq") {
#ifdef HAVE_PARQUET
dev = new OutputDevice_Parquet(name2);
#else
throw IOError(TL("Parquet output is not supported in this build."));
#endif
}
else {
dev = new OutputDevice_File(name2, len > 3 && FileHelpers::getExtension(name) == ".gz");
}
}
// todo: extract this to a class method? (b.c. Parquet doesn't have an iostream)
dev->setPrecision();
dev->getOStream() << std::setiosflags(std::ios::fixed);
dev->setOSFlags(std::ios::fixed);
myOutputDevices[name] = dev;
return *dev;
}
Expand Down Expand Up @@ -207,9 +221,8 @@ OutputDevice::OutputDevice(const int defaultIndentation, const std::string& file
myFilename(filename), myFormatter(new PlainXMLFormatter(defaultIndentation)) {
}


OutputDevice::~OutputDevice() {
delete myFormatter;
OutputDevice::OutputDevice(const std::string& filename, OutputFormatter* formatter) :
myFilename(filename), myFormatter(formatter) {
}


Expand Down Expand Up @@ -240,13 +253,13 @@ OutputDevice::close() {

void
OutputDevice::setPrecision(int precision) {
getOStream() << std::setprecision(precision);
getOStream().setPrecision(precision);
}


int
OutputDevice::precision() {
return (int)getOStream().precision();
return getOStream().precision();
}


Expand Down
Loading
Loading