Skip to content

Commit bd2df88

Browse files
committed
fi: postgres protocol
Fixed issue #668
1 parent 2ed2162 commit bd2df88

File tree

6 files changed

+602
-21
lines changed

6 files changed

+602
-21
lines changed

postgresw/src/main/java/com/arcadedb/postgres/PostgresNetworkExecutor.java

Lines changed: 219 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,13 @@ private void syncCommand() {
233233
writeReadyForQueryMessage();
234234
}
235235

236-
private void flushCommand() {
236+
private void flushCommand() throws IOException {
237237
if (DEBUG)
238238
LogManager.instance().log(this, Level.INFO, "PSQL: flush (thread=%s)", Thread.currentThread().threadId());
239-
writeReadyForQueryMessage();
239+
// Flush message does NOT generate any response according to PostgreSQL protocol.
240+
// It just forces the backend to deliver any data pending in its output buffers.
241+
// See: https://www.postgresql.org/docs/current/protocol-message-formats.html
242+
channel.flush();
240243
}
241244

242245
private void closeCommand() throws IOException {
@@ -283,14 +286,32 @@ private void describeCommand() throws IOException {
283286
portal.cachedResultSet = browseAndCacheResultSet(resultSet, 0);
284287
portal.columns = getColumns(portal.cachedResultSet);
285288
writeRowDescription(portal.columns);
289+
portal.rowDescriptionSent = true;
286290
} else
287291
writeNoData();
288292
} else {
289-
if (portal.columns != null)
293+
if (portal.columns != null) {
290294
writeRowDescription(portal.columns);
295+
portal.rowDescriptionSent = true;
296+
}
291297
}
292298
} else if (type == 'S') {
293-
writeNoData();
299+
// Describe Statement: send ParameterDescription followed by RowDescription/NoData
300+
// This tells the client how many parameters the prepared statement expects
301+
writeParameterDescription(portal);
302+
303+
// Now send RowDescription or NoData
304+
// For SELECT queries, we need to determine the columns from the type schema
305+
if (portal.isExpectingResult && portal.columns == null) {
306+
portal.columns = getColumnsFromQuerySchema(portal.query);
307+
}
308+
309+
if (portal.columns != null && !portal.columns.isEmpty()) {
310+
writeRowDescription(portal.columns);
311+
portal.rowDescriptionSent = true;
312+
} else {
313+
writeNoData();
314+
}
294315
} else
295316
throw new PostgresProtocolException("Unexpected describe type '" + type + "'");
296317
}
@@ -328,23 +349,40 @@ private void executeCommand() {
328349
portal.executed = true;
329350
if (portal.isExpectingResult) {
330351
portal.cachedResultSet = browseAndCacheResultSet(resultSet, limit);
331-
portal.columns = getColumns(portal.cachedResultSet);
332-
writeRowDescription(portal.columns);
352+
// Only send RowDescription if not already sent during DESCRIBE
353+
// But always use columns from actual result for DataRows consistency
354+
if (!portal.rowDescriptionSent) {
355+
portal.columns = getColumns(portal.cachedResultSet);
356+
writeRowDescription(portal.columns);
357+
portal.rowDescriptionSent = true;
358+
}
333359
}
334360
}
335361

336362
if (portal.isExpectingResult) {
337-
if (portal.columns == null)
338-
portal.columns = getColumns(portal.cachedResultSet);
363+
// Always use columns from actual result to ensure DataRows match
364+
final Map<String, PostgresType> dataRowColumns = getColumns(portal.cachedResultSet);
365+
366+
// Verify column count matches what was sent in RowDescription
367+
if (portal.columns != null && portal.columns.size() != dataRowColumns.size()) {
368+
// Column count mismatch - use the original columns from DESCRIBE
369+
// This can happen if sample query returned different properties than actual query
370+
if (DEBUG)
371+
LogManager.instance().log(this, Level.WARNING,
372+
"PSQL: Column count mismatch - RowDesc=%d, DataRow=%d (thread=%s)",
373+
portal.columns.size(), dataRowColumns.size(), Thread.currentThread().threadId());
374+
}
339375

340-
writeDataRows(portal.cachedResultSet, portal.columns);
376+
// Use the columns that were sent in RowDescription for consistency
377+
final Map<String, PostgresType> columnsToUse = portal.columns != null ? portal.columns : dataRowColumns;
378+
writeDataRows(portal.cachedResultSet, columnsToUse);
341379
writeCommandComplete(portal.query, portal.cachedResultSet == null ? 0 : portal.cachedResultSet.size());
342380
} else
343381
writeNoData();
344382
}
345383
} catch (final CommandParsingException e) {
346384
setErrorInTx();
347-
writeError(ERROR_SEVERITY.ERROR, "Syntax error on executing query: " + e.getCause().getMessage(), "42601");
385+
writeError(ERROR_SEVERITY.ERROR, "Syntax error on executing query: " + (e.getCause() != null ? e.getCause().getMessage() : e.getMessage()), "42601");
348386
} catch (final Exception e) {
349387
setErrorInTx();
350388
writeError(ERROR_SEVERITY.ERROR, "Error on executing query: " + e.getMessage(), "XX000");
@@ -402,7 +440,7 @@ else if (query.query.equalsIgnoreCase("BEGIN") ||
402440

403441
} catch (final CommandParsingException e) {
404442
setErrorInTx();
405-
writeError(ERROR_SEVERITY.ERROR, "Syntax error on executing query: " + e.getCause().getMessage(), "42601");
443+
writeError(ERROR_SEVERITY.ERROR, "Syntax error on executing query: " + (e.getCause() != null ? e.getCause().getMessage() : e.getMessage()), "42601");
406444
} catch (final Exception e) {
407445
setErrorInTx();
408446
writeError(ERROR_SEVERITY.ERROR, "Error on executing query: " + e.getMessage(), "XX000");
@@ -504,6 +542,87 @@ private Map<String, PostgresType> getColumns(final List<Result> resultSet) {
504542
return columns;
505543
}
506544

545+
/**
546+
* Extract column schema from a SELECT query by parsing the type name and querying for a sample row.
547+
* This is used during DESCRIBE Statement to return RowDescription before the query is executed.
548+
* ArcadeDB is schema-less so we need to query actual data to discover dynamically-added properties.
549+
*/
550+
private Map<String, PostgresType> getColumnsFromQuerySchema(final String query) {
551+
if (query == null || query.isEmpty()) {
552+
return null;
553+
}
554+
555+
// Try to extract the type name from the query
556+
// Patterns: "SELECT FROM TypeName", "SELECT * FROM TypeName", "SELECT ... FROM TypeName"
557+
final String upperQuery = query.toUpperCase();
558+
final int fromIndex = upperQuery.indexOf(" FROM ");
559+
if (fromIndex < 0) {
560+
return null;
561+
}
562+
563+
String afterFrom = query.substring(fromIndex + 6).trim();
564+
565+
// Extract type name (ends at WHERE, LIMIT, ORDER, or end of string)
566+
String typeName = afterFrom;
567+
for (String terminator : new String[]{" WHERE ", " LIMIT ", " ORDER ", " GROUP ", ";"}) {
568+
final int idx = typeName.toUpperCase().indexOf(terminator);
569+
if (idx > 0) {
570+
typeName = typeName.substring(0, idx);
571+
}
572+
}
573+
typeName = typeName.trim();
574+
575+
// Skip schema: prefix if present
576+
if (typeName.toLowerCase().startsWith("schema:")) {
577+
return null; // Schema queries have different structure
578+
}
579+
580+
try {
581+
// First verify the type exists
582+
final DocumentType docType = database.getSchema().getType(typeName);
583+
if (docType == null) {
584+
return null;
585+
}
586+
587+
// Query for a sample row to discover all properties (including dynamically-added ones)
588+
// Use LIMIT 1 to minimize overhead
589+
final String sampleQuery = "SELECT FROM " + typeName + " LIMIT 1";
590+
final ResultSet resultSet = database.query("sql", sampleQuery, server.getConfiguration());
591+
final List<Result> sampleRows = browseAndCacheResultSet(resultSet, 1);
592+
593+
if (!sampleRows.isEmpty()) {
594+
// Use the sample row to discover columns
595+
return getColumns(sampleRows);
596+
}
597+
598+
// If no rows exist, fall back to schema-defined properties
599+
final Map<String, PostgresType> columns = new LinkedHashMap<>();
600+
601+
// Add system properties first (these are returned for document/vertex types)
602+
columns.put(RID_PROPERTY, PostgresType.VARCHAR);
603+
columns.put(TYPE_PROPERTY, PostgresType.VARCHAR);
604+
columns.put(CAT_PROPERTY, PostgresType.CHAR);
605+
606+
// Add all defined properties from the type
607+
for (final String propName : docType.getPropertyNames()) {
608+
final com.arcadedb.schema.Property prop = docType.getProperty(propName);
609+
if (prop != null && prop.getType() != null) {
610+
columns.put(propName, PostgresType.getTypeFromArcade(prop.getType()));
611+
} else {
612+
columns.put(propName, PostgresType.VARCHAR);
613+
}
614+
}
615+
616+
return columns;
617+
618+
} catch (Exception e) {
619+
if (DEBUG)
620+
LogManager.instance().log(this, Level.WARNING, "PSQL: failed to get columns from schema for query '%s': %s",
621+
query, e.getMessage());
622+
return null;
623+
}
624+
}
625+
507626
private void writeRowDescription(final Map<String, PostgresType> columns) {
508627
if (columns == null)
509628
return;
@@ -618,7 +737,13 @@ private void bindCommand() {
618737
final String portalName = readString();
619738
final String sourcePreparedStatement = readString();
620739

621-
final PostgresPortal portal = getPortal(portalName, false);
740+
// Look up the prepared statement (stored during PARSE)
741+
// The portal name may be different (often empty for unnamed portal)
742+
PostgresPortal portal = getPortal(sourcePreparedStatement, false);
743+
if (portal == null) {
744+
// Try with portal name as fallback for backwards compatibility
745+
portal = getPortal(portalName, false);
746+
}
622747
if (portal == null) {
623748
writeMessage("bind complete", null, '2', 4);
624749
return;
@@ -630,6 +755,9 @@ private void bindCommand() {
630755
Thread.currentThread().threadId());
631756

632757
final int paramFormatCount = channel.readShort();
758+
if (DEBUG)
759+
LogManager.instance().log(this, Level.INFO, "PSQL: bind paramFormatCount=%d (thread=%s)",
760+
paramFormatCount, Thread.currentThread().threadId());
633761
if (paramFormatCount > 0) {
634762
portal.parameterFormats = new ArrayList<>(paramFormatCount);
635763
for (int i = 0; i < paramFormatCount; i++) {
@@ -639,12 +767,21 @@ private void bindCommand() {
639767
}
640768

641769
final int paramValuesCount = channel.readShort();
770+
if (DEBUG)
771+
LogManager.instance().log(this, Level.INFO, "PSQL: bind paramValuesCount=%d (thread=%s)",
772+
paramValuesCount, Thread.currentThread().threadId());
642773
if (paramValuesCount > 0) {
643774
portal.parameterValues = new ArrayList<>(paramValuesCount);
644775
for (int i = 0; i < paramValuesCount; i++) {
776+
if (DEBUG)
777+
LogManager.instance().log(this, Level.INFO, "PSQL: bind reading param %d size (thread=%s)", i, Thread.currentThread().threadId());
645778
final long paramSize = channel.readUnsignedInt();
779+
if (DEBUG)
780+
LogManager.instance().log(this, Level.INFO, "PSQL: bind param %d size=%d (thread=%s)", i, paramSize, Thread.currentThread().threadId());
646781
final byte[] paramValue = new byte[(int) paramSize];
647782
channel.readBytes(paramValue);
783+
if (DEBUG)
784+
LogManager.instance().log(this, Level.INFO, "PSQL: bind param %d value read (thread=%s)", i, Thread.currentThread().threadId());
648785

649786
// Determine format code according to PostgreSQL protocol:
650787
// - If paramFormatCount == 0: all parameters use text format (0)
@@ -664,10 +801,17 @@ private void bindCommand() {
664801
? portal.parameterTypes.get(i)
665802
: 0L; // UNSPECIFIED type
666803

804+
if (DEBUG)
805+
LogManager.instance().log(this, Level.INFO, "PSQL: bind deserializing param %d typeCode=%d formatCode=%d (thread=%s)",
806+
i, typeCode, formatCode, Thread.currentThread().threadId());
667807
portal.parameterValues.add(PostgresType.deserialize(typeCode, formatCode, paramValue));
808+
if (DEBUG)
809+
LogManager.instance().log(this, Level.INFO, "PSQL: bind param %d deserialized (thread=%s)", i, Thread.currentThread().threadId());
668810
}
669811
}
670812

813+
if (DEBUG)
814+
LogManager.instance().log(this, Level.INFO, "PSQL: bind reading resultFormatCount (thread=%s)", Thread.currentThread().threadId());
671815
final int resultFormatCount = channel.readShort();
672816
if (resultFormatCount > 0) {
673817
portal.resultFormats = new ArrayList<>(resultFormatCount);
@@ -680,6 +824,16 @@ private void bindCommand() {
680824
if (errorInTransaction)
681825
return;
682826

827+
// Store the portal under the portal name (which may be empty for unnamed portal)
828+
// This is necessary because EXECUTE looks up portals by portal name, not prepared statement name
829+
// PostgreSQL protocol: PARSE creates "prepared statement", BIND creates "portal" from it
830+
if (!portalName.equals(sourcePreparedStatement)) {
831+
portals.put(portalName, portal);
832+
if (DEBUG)
833+
LogManager.instance().log(this, Level.INFO, "PSQL: bind stored portal under name '%s' (thread=%s)",
834+
portalName, Thread.currentThread().threadId());
835+
}
836+
683837
writeMessage("bind complete", null, '2', 4);
684838

685839
} catch (final Exception e) {
@@ -704,12 +858,25 @@ private void parseCommand() {
704858
final long param = channel.readUnsignedInt();
705859
portal.parameterTypes.add(param);
706860
}
861+
} else {
862+
// Client sent paramCount=0 (e.g., asyncpg, node-postgres)
863+
// Detect $N placeholders in the query to determine actual parameter count
864+
final int detectedParams = detectParameterPlaceholders(query.query);
865+
if (detectedParams > 0) {
866+
portal.parameterTypes = new ArrayList<>(detectedParams);
867+
for (int i = 0; i < detectedParams; i++) {
868+
// Use VARCHAR (OID 1043) as default type instead of 0 (unspecified)
869+
// This prevents asyncpg from trying to introspect unknown types via pg_type
870+
portal.parameterTypes.add((long) PostgresType.VARCHAR.code);
871+
}
872+
}
707873
}
708874

875+
final int actualParamCount = portal.parameterTypes != null ? portal.parameterTypes.size() : 0;
709876
if (DEBUG)
710877
LogManager.instance()
711-
.log(this, Level.INFO, "PSQL: parse (portal=%s) -> %s (params=%d) (errorInTransaction=%s thread=%s)", portalName,
712-
portal.query, paramCount, errorInTransaction, Thread.currentThread().threadId());
878+
.log(this, Level.INFO, "PSQL: parse (portal=%s) -> %s (params=%d, detected=%d) (errorInTransaction=%s thread=%s)",
879+
portalName, portal.query, paramCount, actualParamCount, errorInTransaction, Thread.currentThread().threadId());
713880

714881
if (errorInTransaction)
715882
return;
@@ -895,7 +1062,7 @@ private void parseCommand() {
8951062

8961063
} catch (final CommandParsingException e) {
8971064
setErrorInTx();
898-
writeError(ERROR_SEVERITY.ERROR, "Syntax error on parsing query: " + e.getCause().getMessage(), "42601");
1065+
writeError(ERROR_SEVERITY.ERROR, "Syntax error on parsing query: " + (e.getCause() != null ? e.getCause().getMessage() : e.getMessage()), "42601");
8991066
} catch (final Exception e) {
9001067
setErrorInTx();
9011068
writeError(ERROR_SEVERITY.ERROR, "Error on parsing query: " + e.getMessage(), "XX000");
@@ -1215,6 +1382,43 @@ private void writeNoData() {
12151382
writeMessage("no data", null, 'n', 4);
12161383
}
12171384

1385+
/**
1386+
* Writes ParameterDescription message ('t') describing the parameters of a prepared statement.
1387+
* This is required by the PostgreSQL extended query protocol for DESCRIBE 'S' (Statement).
1388+
*/
1389+
private void writeParameterDescription(final PostgresPortal portal) {
1390+
final int paramCount = portal.parameterTypes != null ? portal.parameterTypes.size() : 0;
1391+
// Message format: 't' + int32 length + int16 param count + int32[] type OIDs
1392+
final int messageLength = 4 + 2 + (paramCount * 4);
1393+
1394+
writeMessage("parameter description", () -> {
1395+
channel.writeShort((short) paramCount);
1396+
if (portal.parameterTypes != null) {
1397+
for (final Long typeOid : portal.parameterTypes) {
1398+
channel.writeUnsignedInt(typeOid != null ? typeOid.intValue() : 0); // 0 = unspecified type
1399+
}
1400+
}
1401+
}, 't', messageLength);
1402+
}
1403+
1404+
/**
1405+
* Detects $N style parameter placeholders in a query and returns the count.
1406+
* PostgreSQL uses $1, $2, etc. for positional parameters.
1407+
* Returns the highest parameter number found (e.g., "$3" returns 3).
1408+
*/
1409+
private int detectParameterPlaceholders(final String query) {
1410+
int maxParam = 0;
1411+
final Pattern pattern = Pattern.compile("\\$(\\d+)");
1412+
final Matcher matcher = pattern.matcher(query);
1413+
while (matcher.find()) {
1414+
final int paramNum = Integer.parseInt(matcher.group(1));
1415+
if (paramNum > maxParam) {
1416+
maxParam = paramNum;
1417+
}
1418+
}
1419+
return maxParam;
1420+
}
1421+
12181422
private PostgresPortal getPortal(final String name, final boolean remove) {
12191423
if (remove)
12201424
return portals.remove(name);

postgresw/src/main/java/com/arcadedb/postgres/PostgresPortal.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ public class PostgresPortal {
3434
public List<Integer> resultFormats;
3535
public List<Result> cachedResultSet;
3636
public Map<String, PostgresType> columns;
37-
public boolean ignoreExecution = false;
37+
public boolean ignoreExecution = false;
3838
public boolean isExpectingResult;
39-
public boolean executed = false;
39+
public boolean executed = false;
40+
public boolean rowDescriptionSent = false;
4041

4142
public PostgresPortal(final String query, String language) {
4243
this.query = query;

0 commit comments

Comments
 (0)