Hi,

 

I’m seeking some help on the best way to use BaseX in a highly concurrent environment.  We’re using the server API to interact with a BaseX 6.6 server and noticing some very odd behavior when 100+ threads use clientsession concurrently to access the same db.

 

Looking at the BaseX logs I get some correct lines like:

 

01:19:21.101       [127.0.0.1:33153]              OPEN Async_StressTestDB_Med

01:19:22.301       [127.0.0.1:33274]              XQUERY insert node <record><contentChannel><String>data2</String></contentChannel><String>value goes here618</String></record> into for $db in collection('Async_StressTestDB_Med')/records return $db

 

But then I also get lines like:

 

01:19:21.105       [127.0.0.1:33161]              OPEONP EANs yAnscy_nSct_rSetsrseTsessTteDsBt_DMBe_dM Error: Stopped at line 1, column 6: Unknown command 'OPEONP'; try "help".

01:19:21.112       [127.0.0.1:33156]              OOPPEENN AAssyynncc__SSttrreessssTTeessttDDBB__MMeedd Error: Stopped at line 1, column 8: Unknown command 'OOPPEENN'; try "help".

01:19:23.065       [127.0.0.1:33642]              XQXERY insYrt ninserde <tr ecord><code nrecord><ctnteneChanntChannel><etl><Strirg>data2i/Strg>datng><a/2contentC/Striang>nnel><Stcintenng>value Channeloes hereString>va<uStrine>< goecord>esn hert152o foring></rec rd>$db in collectiono'Async_ tressTestDor ddb in 'ollec/recoion('As nc_SteressTestDu_Med')/n $dbecords return $db  Error: Stopped at line 1, column 6: Unknown command 'xqxery'; did you mean 'xquery'?

 

The issue seems to happen when calling clientSession.execute(…) because I can verify that the command string being passed to execute is correct even when the log executes a mangled command.

 

I believe 2 problems are happening to us. 

1.       A collection in clientSession is not thread safe when storing the commands passed to it. 

2.       BaseX is write locking the db on concurrent inserts.  I understand the need for that but the wiki on Transaction Management implied that there was a queue where requests would wait if isolation could not be guaranteed.  This sounded automatic.  Are there any steps I need to take to enable the Transaction Management Queue?

 

Below is the JUnit 4 test setup we’re using to find the errors, generally I run 1 test at a time to observe and limit side effects.  The BaseX GUI is closed while the tests run.  I’ve also attached the full BaseX logs generated by running the tests but if they get removed I can easily email them upon request. 

 

How can the problem can be resolved?

 

Thanks,

Jason

 

 

import org.junit._

import Assert._

import scala.util.Random

import scala.concurrent.ops._

import scala.actors.Actor._

import org.basex.core.BaseXException

import org.basex.core.cmd._

import org.basex.server.ClientSession

import scala.xml._

 

class MockBaseXXMLStore

{

 

  val dbHost = "localhost"

  val dbPort = "1984"

  val dbUser = "admin"

  val dbPwd = "admin"

 

  var _clientSession: Option[ClientSession] = None

 

  def clientSession: ClientSession =

  {

    _clientSession match {

      case Some(cs) => cs

      case None => {

        val cs = new ClientSession(dbHost, dbPort.toInt, dbUser, dbPwd)

        _clientSession = Some(cs)

        cs

      }

    }

  }

 

  def insertUpdate(xmlCollStr: String)(xmlElemStr: String): Unit =

  {

    val srvrRspStrm = new java.io.ByteArrayOutputStream()

    try {

      clientSession.setOutputStream(srvrRspStrm)

      clientSession.execute(new Open(xmlCollStr))

 

      val insertTemplate =

        (

          "insert node %NODE% into "

            + "for $db in collection('%COLLNAME%')/records return $db"

          );

 

      val insertQry =

        insertTemplate.replace(

          "%NODE%",

          xmlElemStr

        ).replace(

          "%COLLNAME%",

          xmlCollStr

        )

 

      try {

        clientSession.execute(new XQuery(insertQry))

      }

      catch {

        case e: BaseXException => {

        }

      }

    }

    catch {

      case e: BaseXException => {

        val recordsElem = <records>

          {XML.loadString(xmlElemStr)}

        </records>

        clientSession.execute(new CreateDB(xmlCollStr))

        clientSession.execute(new Add(recordsElem.toString))

      }

    }

    finally {

      srvrRspStrm.close()

    }

  }

}

 

class BaseXTest

{

  val rand = new Random()

 

  @Test

  //Success: 1 created

  def insertUpdate =

  {

    val db = new MockBaseXXMLStore

    db.insertUpdate("MockBaseXXMLStoreTestDB")("<record><contentChannel><String>data2</String></contentChannel><String>value goes here" + rand.nextInt + "</String></record>")

  }

 

  @Test

  //Success: 100 and 1000 created

  def stressInsertUpdate =

  {

    val db = new MockBaseXXMLStore

    println("starting small")

    for (i <- 1 to 100) {

      db.insertUpdate("StressTestDB_Small")("<record><contentChannel><String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")

    }

    println("starting med")

    for (i <- 1 to 1000) {

      db.insertUpdate("StressTestDB_Med")("<record><contentChannel><String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")

    }

  }

 

  @Test

  //Fail: 0 and 184 created

  def stressInsertUpdateAsync =

  {

    val db = new MockBaseXXMLStore

    println("async starting small")

    for (i <- 1 to 100) {

      spawn {

        db.insertUpdate("Async_StressTestDB_Small")("<record><contentChannel><String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")

      }

    }

    println("async starting med")

    for (i <- 1 to 1000) {

      spawn {

        db.insertUpdate("Async_StressTestDB_Med")("<record><contentChannel><String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")

      }

    }

  }

 

  @Test

  //fail 4 and 0 created

  def stressInsertUpdateActor =

  {

    val db = new MockBaseXXMLStore

    println("async starting small")

    for (i <- 1 to 100) {

      actor {

        db.insertUpdate("Actor_StressTestDB_Small")("<record><contentChannel><String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")

      }

    }

    println("async starting med")

    for (i <- 1 to 1000) {

      actor {

        db.insertUpdate("Actor_StressTestDB_Med")("<record><contentChannel><String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")

      }

    }

  }

 

}