Cycles Network rendering, remove some exception throwing, replace with saner error handling

This patch adds a network_error() function more alike how other devices handle error's

- it adds a check for errors on load_kernels to make sure we do not crash if rendering without a server.
- it uses the non throwing variation of boost::asio::read.

Reviewers: brecht

Reviewed By: brecht

CC: brecht

Differential Revision: https://developer.blender.org/D86
This commit is contained in:
Martijn Berger
2014-02-05 21:55:51 +01:00
parent aa3d88d34d
commit 0f91f56ce3
2 changed files with 115 additions and 47 deletions

View File

@@ -53,6 +53,7 @@ public:
NetworkDevice(DeviceInfo& info, Stats &stats, const char *address) NetworkDevice(DeviceInfo& info, Stats &stats, const char *address)
: Device(info, stats, true), socket(io_service) : Device(info, stats, true), socket(io_service)
{ {
error_func = NetworkError();
stringstream portstr; stringstream portstr;
portstr << SERVER_PORT; portstr << SERVER_PORT;
@@ -69,14 +70,14 @@ public:
} }
if(error) if(error)
throw boost::system::system_error(error); error_func.network_error(error.message());
mem_counter = 0; mem_counter = 0;
} }
~NetworkDevice() ~NetworkDevice()
{ {
RPCSend snd(socket, "stop"); RPCSend snd(socket, &error_func, "stop");
snd.write(); snd.write();
} }
@@ -86,7 +87,7 @@ public:
mem.device_pointer = ++mem_counter; mem.device_pointer = ++mem_counter;
RPCSend snd(socket, "mem_alloc"); RPCSend snd(socket, &error_func, "mem_alloc");
snd.add(mem); snd.add(mem);
snd.add(type); snd.add(type);
@@ -97,7 +98,7 @@ public:
{ {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "mem_copy_to"); RPCSend snd(socket, &error_func, "mem_copy_to");
snd.add(mem); snd.add(mem);
snd.write(); snd.write();
@@ -110,7 +111,7 @@ public:
size_t data_size = mem.memory_size(); size_t data_size = mem.memory_size();
RPCSend snd(socket, "mem_copy_from"); RPCSend snd(socket, &error_func, "mem_copy_from");
snd.add(mem); snd.add(mem);
snd.add(y); snd.add(y);
@@ -119,7 +120,7 @@ public:
snd.add(elem); snd.add(elem);
snd.write(); snd.write();
RPCReceive rcv(socket); RPCReceive rcv(socket, &error_func);
rcv.read_buffer((void*)mem.data_pointer, data_size); rcv.read_buffer((void*)mem.data_pointer, data_size);
} }
@@ -127,7 +128,7 @@ public:
{ {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "mem_zero"); RPCSend snd(socket, &error_func, "mem_zero");
snd.add(mem); snd.add(mem);
snd.write(); snd.write();
@@ -138,7 +139,7 @@ public:
if(mem.device_pointer) { if(mem.device_pointer) {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "mem_free"); RPCSend snd(socket, &error_func, "mem_free");
snd.add(mem); snd.add(mem);
snd.write(); snd.write();
@@ -151,7 +152,7 @@ public:
{ {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "const_copy_to"); RPCSend snd(socket, &error_func, "const_copy_to");
string name_string(name); string name_string(name);
@@ -167,7 +168,7 @@ public:
mem.device_pointer = ++mem_counter; mem.device_pointer = ++mem_counter;
RPCSend snd(socket, "tex_alloc"); RPCSend snd(socket, &error_func, "tex_alloc");
string name_string(name); string name_string(name);
@@ -184,7 +185,7 @@ public:
if(mem.device_pointer) { if(mem.device_pointer) {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "tex_free"); RPCSend snd(socket, &error_func, "tex_free");
snd.add(mem); snd.add(mem);
snd.write(); snd.write();
@@ -195,14 +196,17 @@ public:
bool load_kernels(bool experimental) bool load_kernels(bool experimental)
{ {
if(error_func.have_error())
return false;
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "load_kernels"); RPCSend snd(socket, &error_func, "load_kernels");
snd.add(experimental); snd.add(experimental);
snd.write(); snd.write();
bool result; bool result;
RPCReceive rcv(socket); RPCReceive rcv(socket, &error_func);
rcv.read(result); rcv.read(result);
return result; return result;
@@ -214,7 +218,7 @@ public:
the_task = task; the_task = task;
RPCSend snd(socket, "task_add"); RPCSend snd(socket, &error_func, "task_add");
snd.add(task); snd.add(task);
snd.write(); snd.write();
} }
@@ -223,7 +227,7 @@ public:
{ {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "task_wait"); RPCSend snd(socket, &error_func, "task_wait");
snd.write(); snd.write();
lock.unlock(); lock.unlock();
@@ -232,10 +236,13 @@ public:
/* todo: run this threaded for connecting to multiple clients */ /* todo: run this threaded for connecting to multiple clients */
for(;;) { for(;;) {
if(error_func.have_error())
break;
RenderTile tile; RenderTile tile;
lock.lock(); lock.lock();
RPCReceive rcv(socket); RPCReceive rcv(socket, &error_func);
if(rcv.name == "acquire_tile") { if(rcv.name == "acquire_tile") {
lock.unlock(); lock.unlock();
@@ -245,14 +252,14 @@ public:
the_tiles.push_back(tile); the_tiles.push_back(tile);
lock.lock(); lock.lock();
RPCSend snd(socket, "acquire_tile"); RPCSend snd(socket, &error_func, "acquire_tile");
snd.add(tile); snd.add(tile);
snd.write(); snd.write();
lock.unlock(); lock.unlock();
} }
else { else {
lock.lock(); lock.lock();
RPCSend snd(socket, "acquire_tile_none"); RPCSend snd(socket, &error_func, "acquire_tile_none");
snd.write(); snd.write();
lock.unlock(); lock.unlock();
} }
@@ -272,7 +279,7 @@ public:
the_task.release_tile(tile); the_task.release_tile(tile);
lock.lock(); lock.lock();
RPCSend snd(socket, "release_tile"); RPCSend snd(socket, &error_func, "release_tile");
snd.write(); snd.write();
lock.unlock(); lock.unlock();
} }
@@ -288,9 +295,12 @@ public:
void task_cancel() void task_cancel()
{ {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "task_cancel"); RPCSend snd(socket, &error_func, "task_cancel");
snd.write(); snd.write();
} }
private:
NetworkError error_func;
}; };
Device *device_network_create(DeviceInfo& info, Stats &stats, const char *address) Device *device_network_create(DeviceInfo& info, Stats &stats, const char *address)
@@ -316,9 +326,16 @@ class DeviceServer {
public: public:
thread_mutex rpc_lock; thread_mutex rpc_lock;
void network_error(const string &message){
error_func.network_error(message);
}
bool have_error() { return error_func.have_error(); }
DeviceServer(Device *device_, tcp::socket& socket_) DeviceServer(Device *device_, tcp::socket& socket_)
: device(device_), socket(socket_), stop(false), blocked_waiting(false) : device(device_), socket(socket_), stop(false), blocked_waiting(false)
{ {
error_func = NetworkError();
} }
void listen() void listen()
@@ -336,7 +353,7 @@ protected:
void listen_step() void listen_step()
{ {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCReceive rcv(socket); RPCReceive rcv(socket, &error_func);
if(rcv.name == "stop") if(rcv.name == "stop")
stop = true; stop = true;
@@ -493,7 +510,7 @@ protected:
size_t data_size = mem.memory_size(); size_t data_size = mem.memory_size();
RPCSend snd(socket); RPCSend snd(socket, &error_func, "mem_copy_from");
snd.write(); snd.write();
snd.write_buffer((uint8_t*)mem.data_pointer, data_size); snd.write_buffer((uint8_t*)mem.data_pointer, data_size);
lock.unlock(); lock.unlock();
@@ -588,7 +605,7 @@ protected:
bool result; bool result;
result = device->load_kernels(experimental); result = device->load_kernels(experimental);
RPCSend snd(socket); RPCSend snd(socket, &error_func, "load_kernels");
snd.add(result); snd.add(result);
snd.write(); snd.write();
lock.unlock(); lock.unlock();
@@ -631,7 +648,7 @@ protected:
blocked_waiting = false; blocked_waiting = false;
lock.lock(); lock.lock();
RPCSend snd(socket, "task_wait_done"); RPCSend snd(socket, &error_func, "task_wait_done");
snd.write(); snd.write();
lock.unlock(); lock.unlock();
} }
@@ -670,7 +687,7 @@ protected:
bool result = false; bool result = false;
RPCSend snd(socket, "acquire_tile"); RPCSend snd(socket, &error_func, "acquire_tile");
snd.write(); snd.write();
do { do {
@@ -700,7 +717,7 @@ protected:
cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n"; cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n";
} }
} }
} while(acquire_queue.empty() && !stop); } while(acquire_queue.empty() && !stop && !have_error());
return result; return result;
} }
@@ -724,7 +741,7 @@ protected:
{ {
thread_scoped_lock lock(rpc_lock); thread_scoped_lock lock(rpc_lock);
RPCSend snd(socket, "release_tile"); RPCSend snd(socket, &error_func, "release_tile");
snd.add(tile); snd.add(tile);
snd.write(); snd.write();
lock.unlock(); lock.unlock();
@@ -776,8 +793,11 @@ protected:
bool stop; bool stop;
bool blocked_waiting; bool blocked_waiting;
private:
NetworkError error_func;
/* todo: free memory and device (osl) on network error */ /* todo: free memory and device (osl) on network error */
}; };
void Device::server_run() void Device::server_run()

View File

@@ -21,6 +21,8 @@
#include <boost/archive/text_iarchive.hpp> #include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp> #include <boost/archive/text_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/array.hpp> #include <boost/array.hpp>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
@@ -53,6 +55,14 @@ static const int DISCOVER_PORT = 5121;
static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP"; static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP";
static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP"; static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP";
#if 0
typedef boost::archive::text_oarchive o_archive;
typedef boost::archive::text_iarchive i_archive;
#else
typedef boost::archive::binary_oarchive o_archive;
typedef boost::archive::binary_iarchive i_archive;
#endif
/* Serialization of device memory */ /* Serialization of device memory */
class network_device_memory : public device_memory class network_device_memory : public device_memory
@@ -64,15 +74,40 @@ public:
vector<char> local_data; vector<char> local_data;
}; };
/* Common netowrk error function / object for both DeviceNetwork and DeviceServer*/
class NetworkError {
public:
NetworkError() {
error = "";
error_count = 0;
}
~NetworkError() {}
void network_error(const string& message) {
error = message;
error_count += 1;
}
bool have_error() {
return true ? error_count > 0 : false;
}
private:
string error;
int error_count;
};
/* Remote procedure call Send */ /* Remote procedure call Send */
class RPCSend { class RPCSend {
public: public:
RPCSend(tcp::socket& socket_, const string& name_ = "") RPCSend(tcp::socket& socket_, NetworkError* e, const string& name_ = "")
: name(name_), socket(socket_), archive(archive_stream), sent(false) : name(name_), socket(socket_), archive(archive_stream), sent(false)
{ {
archive & name_; archive & name_;
error_func = e;
fprintf(stderr, "rpc send %s\n", name.c_str()); fprintf(stderr, "rpc send %s\n", name.c_str());
} }
@@ -94,7 +129,6 @@ public:
void add(const DeviceTask& task) void add(const DeviceTask& task)
{ {
int type = (int)task.type; int type = (int)task.type;
archive & type & task.x & task.y & task.w & task.h; archive & type & task.x & task.y & task.w & task.h;
archive & task.rgba_byte & task.rgba_half & task.buffer & task.sample & task.num_samples; archive & task.rgba_byte & task.rgba_half & task.buffer & task.sample & task.num_samples;
archive & task.offset & task.stride; archive & task.offset & task.stride;
@@ -128,7 +162,7 @@ public:
boost::asio::transfer_all(), error); boost::asio::transfer_all(), error);
if(error.value()) if(error.value())
cout << "Network send error: " << error.message() << "\n"; error_func->network_error(error.message());
/* then send actual data */ /* then send actual data */
boost::asio::write(socket, boost::asio::write(socket,
@@ -136,7 +170,7 @@ public:
boost::asio::transfer_all(), error); boost::asio::transfer_all(), error);
if(error.value()) if(error.value())
cout << "Network send error: " << error.message() << "\n"; error_func->network_error(error.message());
sent = true; sent = true;
} }
@@ -150,27 +184,34 @@ public:
boost::asio::transfer_all(), error); boost::asio::transfer_all(), error);
if(error.value()) if(error.value())
cout << "Network send error: " << error.message() << "\n"; error_func->network_error(error.message());
} }
protected: protected:
string name; string name;
tcp::socket& socket; tcp::socket& socket;
ostringstream archive_stream; ostringstream archive_stream;
boost::archive::text_oarchive archive; o_archive archive;
bool sent; bool sent;
NetworkError *error_func;
}; };
/* Remote procedure call Receive */ /* Remote procedure call Receive */
class RPCReceive { class RPCReceive {
public: public:
RPCReceive(tcp::socket& socket_) RPCReceive(tcp::socket& socket_, NetworkError* e )
: socket(socket_), archive_stream(NULL), archive(NULL) : socket(socket_), archive_stream(NULL), archive(NULL)
{ {
error_func = e;
/* read head with fixed size */ /* read head with fixed size */
vector<char> header(8); vector<char> header(8);
size_t len = boost::asio::read(socket, boost::asio::buffer(header)); boost::system::error_code error;
size_t len = boost::asio::read(socket, boost::asio::buffer(header), error);
if(error.value()){
error_func->network_error(error.message());
}
/* verify if we got something */ /* verify if we got something */
if(len == header.size()) { if(len == header.size()) {
@@ -183,30 +224,31 @@ public:
if((header_stream >> hex >> data_size)) { if((header_stream >> hex >> data_size)) {
vector<char> data(data_size); vector<char> data(data_size);
size_t len = boost::asio::read(socket, boost::asio::buffer(data)); size_t len = boost::asio::read(socket, boost::asio::buffer(data), error);
if(error.value())
error_func->network_error(error.message());
if(len == data_size) { if(len == data_size) {
archive_str = (data.size())? string(&data[0], data.size()): string(""); archive_str = (data.size())? string(&data[0], data.size()): string("");
#if 0
istringstream archive_stream(archive_str);
boost::archive::text_iarchive archive(archive_stream);
#endif
archive_stream = new istringstream(archive_str); archive_stream = new istringstream(archive_str);
archive = new boost::archive::text_iarchive(*archive_stream); archive = new i_archive(*archive_stream);
*archive & name; *archive & name;
fprintf(stderr, "rpc receive %s\n", name.c_str()); fprintf(stderr, "rpc receive %s\n", name.c_str());
} }
else { else {
cout << "Network receive error: data size doesn't match header\n"; error_func->network_error("Network receive error: data size doesn't match header");
} }
} }
else { else {
cout << "Network receive error: can't decode data size from header\n"; error_func->network_error("Network receive error: can't decode data size from header");
} }
} }
else { else {
cout << "Network receive error: invalid header size\n"; error_func->network_error("Network receive error: invalid header size");
} }
} }
@@ -231,7 +273,12 @@ public:
void read_buffer(void *buffer, size_t size) void read_buffer(void *buffer, size_t size)
{ {
size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size)); boost::system::error_code error;
size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size), error);
if(error.value()){
error_func->network_error(error.message());
}
if(len != size) if(len != size)
cout << "Network receive error: buffer size doesn't match expected size\n"; cout << "Network receive error: buffer size doesn't match expected size\n";
@@ -267,7 +314,8 @@ protected:
tcp::socket& socket; tcp::socket& socket;
string archive_str; string archive_str;
istringstream *archive_stream; istringstream *archive_stream;
boost::archive::text_iarchive *archive; i_archive *archive;
NetworkError *error_func;
}; };
/* Server auto discovery */ /* Server auto discovery */