Hi,
I'm seeking some help on the best way to use BaseX in a highly
concurrent environment. We're using the server API to interact with a
BaseX 6.6 server and noticing some very odd behavior when 100+ threads
use clientsession concurrently to access the same db.
Looking at the BaseX logs I get some correct lines like:
01:19:21.101 [127.0.0.1:33153] OPEN
Async_StressTestDB_Med
01:19:22.301 [127.0.0.1:33274] XQUERY insert node
<record><contentChannel><String>data2</String></contentChannel><String>v
alue goes here618</String></record> into for $db in
collection('Async_StressTestDB_Med')/records return $db
But then I also get lines like:
01:19:21.105 [127.0.0.1:33161] OPEONP EANs
yAnscy_nSct_rSetsrseTsessTteDsBt_DMBe_dM Error: Stopped at line 1,
column 6: Unknown command 'OPEONP'; try "help".
01:19:21.112 [127.0.0.1:33156] OOPPEENN
AAssyynncc__SSttrreessssTTeessttDDBB__MMeedd Error: Stopped at line 1,
column 8: Unknown command 'OOPPEENN'; try "help".
01:19:23.065 [127.0.0.1:33642] XQXERY insYrt ninserde
<tr ecord><code
nrecord><ctnteneChanntChannel><etl><Strirg>data2i/Strg>datng><a/2content
C/Striang>nnel><Stcintenng>value Channeloes hereString>va<uStrine><
goecord>esn hert152o foring></rec rd>$db in collectiono'Async_
tressTestDor ddb in 'ollec/recoion('As nc_SteressTestDu_Med')/n
$dbecords return $db Error: Stopped at line 1, column 6: Unknown
command 'xqxery'; did you mean 'xquery'?
The issue seems to happen when calling clientSession.execute(...)
because I can verify that the command string being passed to execute is
correct even when the log executes a mangled command.
I believe 2 problems are happening to us.
1. A collection in clientSession is not thread safe when storing
the commands passed to it.
2. BaseX is write locking the db on concurrent inserts. I
understand the need for that but the wiki on Transaction Management
implied that there was a queue where requests would wait if isolation
could not be guaranteed. This sounded automatic. Are there any steps I
need to take to enable the Transaction Management Queue?
Below is the JUnit 4 test setup we're using to find the errors,
generally I run 1 test at a time to observe and limit side effects. The
BaseX GUI is closed while the tests run. I've also attached the full
BaseX logs generated by running the tests but if they get removed I can
easily email them upon request.
How can the problem can be resolved?
Thanks,
Jason
import org.junit._
import Assert._
import scala.util.Random
import scala.concurrent.ops._
import scala.actors.Actor._
import org.basex.core.BaseXException
import org.basex.core.cmd._
import org.basex.server.ClientSession
import scala.xml._
class MockBaseXXMLStore
{
val dbHost = "localhost"
val dbPort = "1984"
val dbUser = "admin"
val dbPwd = "admin"
var _clientSession: Option[ClientSession] = None
def clientSession: ClientSession =
{
_clientSession match {
case Some(cs) => cs
case None => {
val cs = new ClientSession(dbHost, dbPort.toInt, dbUser, dbPwd)
_clientSession = Some(cs)
cs
}
}
}
def insertUpdate(xmlCollStr: String)(xmlElemStr: String): Unit =
{
val srvrRspStrm = new java.io.ByteArrayOutputStream()
try {
clientSession.setOutputStream(srvrRspStrm)
clientSession.execute(new Open(xmlCollStr))
val insertTemplate =
(
"insert node %NODE% into "
+ "for $db in collection('%COLLNAME%')/records return $db"
);
val insertQry =
insertTemplate.replace(
"%NODE%",
xmlElemStr
).replace(
"%COLLNAME%",
xmlCollStr
)
try {
clientSession.execute(new XQuery(insertQry))
}
catch {
case e: BaseXException => {
}
}
}
catch {
case e: BaseXException => {
val recordsElem = <records>
{XML.loadString(xmlElemStr)}
</records>
clientSession.execute(new CreateDB(xmlCollStr))
clientSession.execute(new Add(recordsElem.toString))
}
}
finally {
srvrRspStrm.close()
}
}
}
class BaseXTest
{
val rand = new Random()
@Test
//Success: 1 created
def insertUpdate =
{
val db = new MockBaseXXMLStore
db.insertUpdate("MockBaseXXMLStoreTestDB")("<record><contentChannel><Str
ing>data2</String></contentChannel><String>value goes here" +
rand.nextInt + "</String></record>")
}
@Test
//Success: 100 and 1000 created
def stressInsertUpdate =
{
val db = new MockBaseXXMLStore
println("starting small")
for (i <- 1 to 100) {
db.insertUpdate("StressTestDB_Small")("<record><contentChannel><String>d
ata2</String></contentChannel><String>value goes here" + i +
"</String></record>")
}
println("starting med")
for (i <- 1 to 1000) {
db.insertUpdate("StressTestDB_Med")("<record><contentChannel><String>dat
a2</String></contentChannel><String>value goes here" + i +
"</String></record>")
}
}
@Test
//Fail: 0 and 184 created
def stressInsertUpdateAsync =
{
val db = new MockBaseXXMLStore
println("async starting small")
for (i <- 1 to 100) {
spawn {
db.insertUpdate("Async_StressTestDB_Small")("<record><contentChannel><St
ring>data2</String></contentChannel><String>value goes here" + i +
"</String></record>")
}
}
println("async starting med")
for (i <- 1 to 1000) {
spawn {
db.insertUpdate("Async_StressTestDB_Med")("<record><contentChannel><Stri
ng>data2</String></contentChannel><String>value goes here" + i +
"</String></record>")
}
}
}
@Test
//fail 4 and 0 created
def stressInsertUpdateActor =
{
val db = new MockBaseXXMLStore
println("async starting small")
for (i <- 1 to 100) {
actor {
db.insertUpdate("Actor_StressTestDB_Small")("<record><contentChannel><St
ring>data2</String></contentChannel><String>value goes here" + i +
"</String></record>")
}
}
println("async starting med")
for (i <- 1 to 1000) {
actor {
db.insertUpdate("Actor_StressTestDB_Med")("<record><contentChannel><Stri
ng>data2</String></contentChannel><String>value goes here" + i +
"</String></record>")
}
}
}
}